You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "jack (JIRA)" <ji...@apache.org> on 2018/12/27 07:32:00 UTC

[jira] [Comment Edited] (AIRFLOW-3568) S3ToGoogleCloudStorageOperator failed after succeeding in copying files from s3

    [ https://issues.apache.org/jira/browse/AIRFLOW-3568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16729400#comment-16729400 ] 

jack edited comment on AIRFLOW-3568 at 12/27/18 7:31 AM:
---------------------------------------------------------

[~yohei] No problem. note that it was approved but not yet merged. It might be few more days as maybe other committers will have more comments.

You are more than welcome to check the Jira for more issues to fix.


was (Author: jackjack10):
[~yohei] No problem. note that it was approved but not yet merged. It might be few more days as maybe other committers will have more comments. 

You are more than welcome to search more issues with the gcp label to fix:

https://issues.apache.org/jira/browse/AIRFLOW-3503?jql=project%20%3D%20AIRFLOW%20AND%20labels%20%3D%20gcp

> S3ToGoogleCloudStorageOperator failed after succeeding in copying files from s3
> -------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-3568
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3568
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: contrib
>    Affects Versions: 1.10.0
>            Reporter: Yohei Onishi
>            Assignee: Yohei Onishi
>            Priority: Major
>
> I tried to copy files from s3 to gcs using 
> S3ToGoogleCloudStorageOperator. The file successfully was uploaded to GCS but the task failed with the following error.
> {code:java}
> [2018-12-26 07:56:33,062] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 [2018-12-26 07:56:33,062] {discovery.py:871} INFO - URL being requested: POST https://www.googleapis.com/upload/storage/v1/b/stg-rfid-etl-tmp/o?name=rfid_wh%2Fuq%2Fjp%2Fno_resp_carton_1D%2F2018%2F12%2F24%2F21%2Fno_resp_carton_20181224210201.csv&alt=json&uploadType=media
> [2018-12-26 07:56:33,214] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 [2018-12-26 07:56:33,213] {s3_to_gcs_operator.py:177} INFO - All done, uploaded 1 files to Google Cloud Storage
> [2018-12-26 07:56:33,217] {models.py:1736} ERROR - Object of type 'set' is not JSON serializable
> Traceback (most recent call last)
>   File "/usr/local/lib/airflow/airflow/models.py", line 1637, in _run_raw_tas
>     self.xcom_push(key=XCOM_RETURN_KEY, value=result
>   File "/usr/local/lib/airflow/airflow/models.py", line 1983, in xcom_pus
>     execution_date=execution_date or self.execution_date
>   File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrappe
>     return func(*args, **kwargs
>   File "/usr/local/lib/airflow/airflow/models.py", line 4531, in se
>     value = json.dumps(value).encode('UTF-8'
>   File "/usr/local/lib/python3.6/json/__init__.py", line 231, in dump
>     return _default_encoder.encode(obj
>   File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encod
>     chunks = self.iterencode(o, _one_shot=True
>   File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencod
>     return _iterencode(o, 0
>   File "/usr/local/lib/python3.6/json/encoder.py", line 180, in defaul
>     o.__class__.__name__
> TypeError: Object of type 'set' is not JSON serializabl
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 [2018-12-26 07:56:33,217] {models.py:1736} ERROR - Object of type 'set' is not JSON serializable
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 Traceback (most recent call last):
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3   File "/usr/local/lib/airflow/airflow/models.py", line 1637, in _run_raw_task
> [2018-12-26 07:56:33,220] {models.py:1756} INFO - Marking task as UP_FOR_RETRY
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3     self.xcom_push(key=XCOM_RETURN_KEY, value=result)
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3   File "/usr/local/lib/airflow/airflow/models.py", line 1983, in xcom_push
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3     execution_date=execution_date or self.execution_date)
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3   File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrapper
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3     return func(*args, **kwargs)
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3   File "/usr/local/lib/airflow/airflow/models.py", line 4531, in set
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3     value = json.dumps(value).encode('UTF-8')
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3   File "/usr/local/lib/python3.6/json/__init__.py", line 231, in dumps
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3     return _default_encoder.encode(obj)
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3   File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encode
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3     chunks = self.iterencode(o, _one_shot=True)
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3   File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencode
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3     return _iterencode(o, 0)
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3   File "/usr/local/lib/python3.6/json/encoder.py", line 180, in default
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3     o.__class__.__name__)
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 TypeError: Object of type 'set' is not JSON serializable
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 [2018-12-26 07:56:33,220] {models.py:1756} INFO - Marking task as UP_FOR_RETRY
> {code}
>  
> According the error log, it failed because it tries to push return files list as set to xcom but xcom_push does not support set.
> [https://github.com/apache/incubator-airflow/blob/1.10.0/airflow/models.py#L1637]
> {code:java}
> # If the task returns a result, push an XCom containing it
> if result is not None:
>     self.xcom_push(key=XCOM_RETURN_KEY, value=result){code}
> [https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/s3_to_gcs_operator.py#L155]
> [https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/s3_to_gcs_operator.py#L198]
> {code:java}
> files = set(files) - set(existing_files)
> ...
> return files{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)