You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/11 04:40:14 UTC

[GitHub] [airflow] MaiHoangViet1809 opened a new issue, #26980: Output from traditional Task can not be used in task decorator without a *reduce* task

MaiHoangViet1809 opened a new issue, #26980:
URL: https://github.com/apache/airflow/issues/26980

   ### Apache Airflow version
   
   2.4.1
   
   ### What happened
   
   When build a flow between traditional task and taskflow API, I saw this error log:
   ```
   [2022-10-11, 11:23:58 +07] {taskinstance.py:1851} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/home/prod_account/env3.10.5/lib/python3.10/site-packages/airflow/utils/session.py", line 72, in wrapper
       return func(*args, **kwargs)
     File "/home/prod_account/env3.10.5/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2378, in xcom_push
       XCom.set(
     File "/home/prod_account/env3.10.5/lib/python3.10/site-packages/airflow/utils/session.py", line 72, in wrapper
       return func(*args, **kwargs)
     File "/home/prod_account/env3.10.5/lib/python3.10/site-packages/airflow/models/xcom.py", line 206, in set
       value = cls.serialize_value(
     File "/home/prod_account/env3.10.5/lib/python3.10/site-packages/airflow/models/xcom.py", line 597, in serialize_value
       return json.dumps(value).encode('UTF-8')
     File "/home/prod_account/apps/python3_10/lib/python3.10/json/__init__.py", line 231, in dumps
       return _default_encoder.encode(obj)
     File "/home/prod_account/apps/python3_10/lib/python3.10/json/encoder.py", line 199, in encode
       chunks = self.iterencode(o, _one_shot=True)
     File "/home/prod_account/apps/python3_10/lib/python3.10/json/encoder.py", line 257, in iterencode
       return _iterencode(o, 0)
     File "/home/prod_account/apps/python3_10/lib/python3.10/json/encoder.py", line 179, in default
       raise TypeError(f'Object of type {o.__class__.__name__} '
   TypeError: Object of type _LazyXComAccess is not JSON serializable
   ```
   
   ### What you think should happen instead
   
   It a task expand call follow a S3keySensor, then I try to wrap it with another reduce (return a list or do aggregation), the result push should be reduced by default to be used in next task if that next task is not a map/expand task ?
   
   ### How to reproduce
   
   this is the sample code included both working and bug flow:
   
       import sys, os
       from airflow import DAG
       from datetime import datetime, date, timedelta
       from dateutil.relativedelta import relativedelta
       from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
       from airflow.sensors.base import PokeReturnValue
       
       from airflow.models.baseoperator import chain
       from airflow.utils.state import State
       from airflow.utils.trigger_rule import TriggerRule
       from airflow.decorators import dag, task
       from airflow.operators.python import get_current_context
       from airflow.exceptions import AirflowFailException
       from typing import List, Callable, Dict, Union
       from functools import partial
       
       PARAMS = {"INPUT_FOLDER": "/input",
                 "DATA_FOLDER" : "/data",
                }
       
       DEBUG = True
       
       class S3KeySensor_test(S3KeySensor):
           def poke(self, context) -> Union[bool, PokeReturnValue]:
               check_result = True # all(self._check_key(key) for key in self.bucket_key)
               if check_result:
                   return PokeReturnValue(is_done=True, xcom_value=",".join(self.bucket_key))
               else:
                   return False
       
       with DAG("AA.REPORT_BUG", start_date=datetime(2022,1,1), catchup=False, schedule=None, params=PARAMS) as dag:
       
           @task
           def generate_dependency():
               ctx = get_current_context()
               params = ctx["params"]   
               logical_date = ctx["logical_date"]
               return [{"name": "test1" , "bucket_key" : f"s3:/{ params['INPUT_FOLDER'] }/{ logical_date.strftime('%Y/%m/%d') }"}, 
                       {"name": "test2" , "bucket_key" : f"s3:/{ params['DATA_FOLDER'] }/{ logical_date.strftime('%Y/%m/%d') }"}, 
                      ]
           
           @task(multiple_outputs=True)
           def map_dependency(data) -> Dict[str, str]:
               return {"bucket_key": data["bucket_key"] }
           
           @task
           def reduce_task(data):
               return list(data or [])
             
           @task
           def non_reduce_task(arg_1):
               return arg_1
           
           # bug flow
           list_dependency = generate_dependency.override(task_id="list_dependency")()
           dependencies = map_dependency.override(task_id="map_dependency").expand(data=list_dependency)
           
           list_task_sensor = S3KeySensor_test.partial(task_id="check_s3", 
                                                       wildcard_match=True,
                                                       aws_conn_id='aws_default',
                                                       timeout=300,
                                                       poke_interval=120,
                                                       mode="reschedule")\
                                              .expand_kwargs(dependencies) 
           
           bug_task = non_reduce_task.override(task_id="bug_task", retries=3, retry_delay=timedelta(minutes=15))(arg_1=list_task_sensor.output)
       
           
           # work flow
           list_dependency_2 = generate_dependency.override(task_id="list_dependency_2")()
           dependencies_2 = map_dependency.override(task_id="map_dependency_2").expand(data=list_dependency_2)
           
           list_task_sensor_2 = S3KeySensor_test.partial(task_id="check_s3_2", 
                                                         wildcard_match=True,
                                                         aws_conn_id='aws_default',
                                                         timeout=300,
                                                         poke_interval=120,
                                                         mode="reschedule")\
                                                .expand_kwargs(dependencies_2) 
           
           normal_task = non_reduce_task.override(task_id="normal_task", retries=3, retry_delay=timedelta(minutes=15))(arg_1=reduce_task(list_task_sensor_2.output))
   
   
   ### Operating System
   
   REHL 7
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==4.1.0
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #26980: Output from traditional Task can not be used in task decorator without a *reduce* task

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #26980:
URL: https://github.com/apache/airflow/issues/26980#issuecomment-1274077957

   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #26980: Output from traditional Task can not be used in task decorator without a *reduce* task

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #26980:
URL: https://github.com/apache/airflow/issues/26980#issuecomment-1287947014

   I believe you should make sure that your task returns a serializable value. Maybe @uranusjr have more to talk about it but IMHO returning JSON serializable value is a responsibility of the person who wants the operator/task to work in dynamic task context. 
   
   Converting it into discussion if needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed issue #26980: Output from traditional Task can not be used in task decorator without a *reduce* task

Posted by GitBox <gi...@apache.org>.
potiuk closed issue #26980: Output from traditional Task can not be used in task decorator without a *reduce* task
URL: https://github.com/apache/airflow/issues/26980


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org