You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/11 04:40:14 UTC
[GitHub] [airflow] MaiHoangViet1809 opened a new issue, #26980: Output from traditional Task can not be used in task decorator without a *reduce* task
MaiHoangViet1809 opened a new issue, #26980:
URL: https://github.com/apache/airflow/issues/26980
### Apache Airflow version
2.4.1
### What happened
When build a flow between traditional task and taskflow API, I saw this error log:
```
[2022-10-11, 11:23:58 +07] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/prod_account/env3.10.5/lib/python3.10/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/home/prod_account/env3.10.5/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2378, in xcom_push
XCom.set(
File "/home/prod_account/env3.10.5/lib/python3.10/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/home/prod_account/env3.10.5/lib/python3.10/site-packages/airflow/models/xcom.py", line 206, in set
value = cls.serialize_value(
File "/home/prod_account/env3.10.5/lib/python3.10/site-packages/airflow/models/xcom.py", line 597, in serialize_value
return json.dumps(value).encode('UTF-8')
File "/home/prod_account/apps/python3_10/lib/python3.10/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/home/prod_account/apps/python3_10/lib/python3.10/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/home/prod_account/apps/python3_10/lib/python3.10/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/home/prod_account/apps/python3_10/lib/python3.10/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type _LazyXComAccess is not JSON serializable
```
### What you think should happen instead
It a task expand call follow a S3keySensor, then I try to wrap it with another reduce (return a list or do aggregation), the result push should be reduced by default to be used in next task if that next task is not a map/expand task ?
### How to reproduce
this is the sample code included both working and bug flow:
import sys, os
from airflow import DAG
from datetime import datetime, date, timedelta
from dateutil.relativedelta import relativedelta
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.base import PokeReturnValue
from airflow.models.baseoperator import chain
from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.exceptions import AirflowFailException
from typing import List, Callable, Dict, Union
from functools import partial
PARAMS = {"INPUT_FOLDER": "/input",
"DATA_FOLDER" : "/data",
}
DEBUG = True
class S3KeySensor_test(S3KeySensor):
def poke(self, context) -> Union[bool, PokeReturnValue]:
check_result = True # all(self._check_key(key) for key in self.bucket_key)
if check_result:
return PokeReturnValue(is_done=True, xcom_value=",".join(self.bucket_key))
else:
return False
with DAG("AA.REPORT_BUG", start_date=datetime(2022,1,1), catchup=False, schedule=None, params=PARAMS) as dag:
@task
def generate_dependency():
ctx = get_current_context()
params = ctx["params"]
logical_date = ctx["logical_date"]
return [{"name": "test1" , "bucket_key" : f"s3:/{ params['INPUT_FOLDER'] }/{ logical_date.strftime('%Y/%m/%d') }"},
{"name": "test2" , "bucket_key" : f"s3:/{ params['DATA_FOLDER'] }/{ logical_date.strftime('%Y/%m/%d') }"},
]
@task(multiple_outputs=True)
def map_dependency(data) -> Dict[str, str]:
return {"bucket_key": data["bucket_key"] }
@task
def reduce_task(data):
return list(data or [])
@task
def non_reduce_task(arg_1):
return arg_1
# bug flow
list_dependency = generate_dependency.override(task_id="list_dependency")()
dependencies = map_dependency.override(task_id="map_dependency").expand(data=list_dependency)
list_task_sensor = S3KeySensor_test.partial(task_id="check_s3",
wildcard_match=True,
aws_conn_id='aws_default',
timeout=300,
poke_interval=120,
mode="reschedule")\
.expand_kwargs(dependencies)
bug_task = non_reduce_task.override(task_id="bug_task", retries=3, retry_delay=timedelta(minutes=15))(arg_1=list_task_sensor.output)
# work flow
list_dependency_2 = generate_dependency.override(task_id="list_dependency_2")()
dependencies_2 = map_dependency.override(task_id="map_dependency_2").expand(data=list_dependency_2)
list_task_sensor_2 = S3KeySensor_test.partial(task_id="check_s3_2",
wildcard_match=True,
aws_conn_id='aws_default',
timeout=300,
poke_interval=120,
mode="reschedule")\
.expand_kwargs(dependencies_2)
normal_task = non_reduce_task.override(task_id="normal_task", retries=3, retry_delay=timedelta(minutes=15))(arg_1=reduce_task(list_task_sensor_2.output))
### Operating System
REHL 7
### Versions of Apache Airflow Providers
apache-airflow-providers-amazon==4.1.0
### Deployment
Other
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on issue #26980: Output from traditional Task can not be used in task decorator without a *reduce* task
Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #26980:
URL: https://github.com/apache/airflow/issues/26980#issuecomment-1274077957
Thanks for opening your first issue here! Be sure to follow the issue template!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #26980: Output from traditional Task can not be used in task decorator without a *reduce* task
Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #26980:
URL: https://github.com/apache/airflow/issues/26980#issuecomment-1287947014
I believe you should make sure that your task returns a serializable value. Maybe @uranusjr have more to talk about it but IMHO returning JSON serializable value is a responsibility of the person who wants the operator/task to work in dynamic task context.
Converting it into discussion if needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk closed issue #26980: Output from traditional Task can not be used in task decorator without a *reduce* task
Posted by GitBox <gi...@apache.org>.
potiuk closed issue #26980: Output from traditional Task can not be used in task decorator without a *reduce* task
URL: https://github.com/apache/airflow/issues/26980
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org