You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Yohei Onishi (JIRA)" <ji...@apache.org> on 2018/12/26 09:52:00 UTC

[jira] [Assigned] (AIRFLOW-3568) S3ToGoogleCloudStorageOperator failed after succeeding in copying files from s3

     [ https://issues.apache.org/jira/browse/AIRFLOW-3568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yohei Onishi reassigned AIRFLOW-3568:
-------------------------------------

    Assignee: Yohei Onishi

> S3ToGoogleCloudStorageOperator failed after succeeding in copying files from s3
> -------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-3568
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3568
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: contrib
>    Affects Versions: 1.10.0
>            Reporter: Yohei Onishi
>            Assignee: Yohei Onishi
>            Priority: Major
>
> I tried to copy files from s3 to gcs using 
> S3ToGoogleCloudStorageOperator. The file successfully was uploaded to GCS but the task failed with the following error.
> {code:java}
> [2018-12-26 07:56:33,062] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 [2018-12-26 07:56:33,062] {discovery.py:871} INFO - URL being requested: POST https://www.googleapis.com/upload/storage/v1/b/stg-rfid-etl-tmp/o?name=rfid_wh%2Fuq%2Fjp%2Fno_resp_carton_1D%2F2018%2F12%2F24%2F21%2Fno_resp_carton_20181224210201.csv&alt=json&uploadType=media
> [2018-12-26 07:56:33,214] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 [2018-12-26 07:56:33,213] {s3_to_gcs_operator.py:177} INFO - All done, uploaded 1 files to Google Cloud Storage
> [2018-12-26 07:56:33,217] {models.py:1736} ERROR - Object of type 'set' is not JSON serializable
> Traceback (most recent call last)
>   File "/usr/local/lib/airflow/airflow/models.py", line 1637, in _run_raw_tas
>     self.xcom_push(key=XCOM_RETURN_KEY, value=result
>   File "/usr/local/lib/airflow/airflow/models.py", line 1983, in xcom_pus
>     execution_date=execution_date or self.execution_date
>   File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrappe
>     return func(*args, **kwargs
>   File "/usr/local/lib/airflow/airflow/models.py", line 4531, in se
>     value = json.dumps(value).encode('UTF-8'
>   File "/usr/local/lib/python3.6/json/__init__.py", line 231, in dump
>     return _default_encoder.encode(obj
>   File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encod
>     chunks = self.iterencode(o, _one_shot=True
>   File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencod
>     return _iterencode(o, 0
>   File "/usr/local/lib/python3.6/json/encoder.py", line 180, in defaul
>     o.__class__.__name__
> TypeError: Object of type 'set' is not JSON serializabl
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 [2018-12-26 07:56:33,217] {models.py:1736} ERROR - Object of type 'set' is not JSON serializable
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 Traceback (most recent call last):
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3   File "/usr/local/lib/airflow/airflow/models.py", line 1637, in _run_raw_task
> [2018-12-26 07:56:33,220] {models.py:1756} INFO - Marking task as UP_FOR_RETRY
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3     self.xcom_push(key=XCOM_RETURN_KEY, value=result)
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3   File "/usr/local/lib/airflow/airflow/models.py", line 1983, in xcom_push
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3     execution_date=execution_date or self.execution_date)
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3   File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrapper
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3     return func(*args, **kwargs)
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3   File "/usr/local/lib/airflow/airflow/models.py", line 4531, in set
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3     value = json.dumps(value).encode('UTF-8')
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3   File "/usr/local/lib/python3.6/json/__init__.py", line 231, in dumps
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3     return _default_encoder.encode(obj)
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3   File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encode
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3     chunks = self.iterencode(o, _one_shot=True)
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3   File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencode
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3     return _iterencode(o, 0)
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3   File "/usr/local/lib/python3.6/json/encoder.py", line 180, in default
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3     o.__class__.__name__)
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 TypeError: Object of type 'set' is not JSON serializable
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 [2018-12-26 07:56:33,220] {models.py:1756} INFO - Marking task as UP_FOR_RETRY
> {code}
>  
> According the error log, it failed because it tries to push return files list as set to xcom but xcom_push does not support set.
> [https://github.com/apache/incubator-airflow/blob/1.10.0/airflow/models.py#L1637]
> {code:java}
> # If the task returns a result, push an XCom containing it
> if result is not None:
>     self.xcom_push(key=XCOM_RETURN_KEY, value=result){code}
> [https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/s3_to_gcs_operator.py#L155]
> [https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/s3_to_gcs_operator.py#L198]
> {code:java}
> files = set(files) - set(existing_files)
> ...
> return files{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)