You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/09 12:42:29 UTC

[GitHub] [airflow] rrpelgrim commented on issue #9606: Turn off pickling of XCom by default in 2.0

rrpelgrim commented on issue #9606:
URL: https://github.com/apache/airflow/issues/9606#issuecomment-964116442


   I'm running into the same issue trying to load S3 data into a dask dataframe, collecting a groupby result (pandas series) and passing that as input to a downstream task.
   
   I've tried setting `enable_xcom_pickling` to `True` in both the `airflow.cfg` as well as with the env variable `AIRFLOW__CORE__ENABLE_XCOM_PICKLING` but still getting the error message:
   
   ```[2021-11-09, 12:23:02 UTC] {xcom.py:333} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
   [2021-11-09, 12:23:02 UTC] {taskinstance.py:1703} ERROR - Task failed with exception
   ```
   
   
   ```python
   from datetime import datetime, timedelta
   from time import sleep
   import coiled
   from dask.distributed import Client
   import dask.dataframe as dd
   
   from airflow.decorators import dag, task
   
   
   default_args = {
       'owner': 'rrpelgrim',
       'depends_on_past': False,
       'email': ['richard@coiled.io'],
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 1,
       'retry_delay': timedelta(minutes=5),
   }
   
   @dag(
       default_args=default_args,
       schedule_interval=None, 
       start_date=datetime(2021, 1, 1),
       catchup=False,
       tags=['coiled-demo'],
       )
   def coiled_airflow_taskflow():
       """
       Just trying this out...
       """
       @task()
       def print_something():
           print('something')
           sleep(3)
   
       @task()
       def extract():
           """
           A sample Dask task on Coiled...
           """
           # Create and connect to Coiled cluster
           cluster = coiled.Cluster(n_workers=10, name="airflow-task")
           client = Client(cluster)
           print("Dashboard:", client.dashboard_link)
   
           # Read CSV data from S3
           df = dd.read_csv(
               "s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv",
               dtype={
                   "payment_type": "UInt8",
                   "VendorID": "UInt8",
                   "passenger_count": "UInt8",
                   "RatecodeID": "UInt8",
               },
               storage_options={"anon": True},
               blocksize="16 MiB",
           ).persist()
   
           # Compute result
           result = df.groupby("passenger_count").tip_amount.mean().compute()
           return result
   
       @task()
       def print_result(result):
           print(result)
           sleep(3)
   
       # call task functions in order
       print_something = print_something()
       result = extract()
       print_result = print_result(result)
   
   # call taskflow
   demo_taskflow = coiled_airflow_taskflow()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org