You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Yohei Onishi (JIRA)" <ji...@apache.org> on 2018/12/26 09:52:00 UTC
[jira] [Assigned] (AIRFLOW-3568) S3ToGoogleCloudStorageOperator
failed after succeeding in copying files from s3
[ https://issues.apache.org/jira/browse/AIRFLOW-3568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yohei Onishi reassigned AIRFLOW-3568:
-------------------------------------
Assignee: Yohei Onishi
> S3ToGoogleCloudStorageOperator failed after succeeding in copying files from s3
> -------------------------------------------------------------------------------
>
> Key: AIRFLOW-3568
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3568
> Project: Apache Airflow
> Issue Type: Bug
> Components: contrib
> Affects Versions: 1.10.0
> Reporter: Yohei Onishi
> Assignee: Yohei Onishi
> Priority: Major
>
> I tried to copy files from s3 to gcs using
> S3ToGoogleCloudStorageOperator. The file successfully was uploaded to GCS but the task failed with the following error.
> {code:java}
> [2018-12-26 07:56:33,062] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 [2018-12-26 07:56:33,062] {discovery.py:871} INFO - URL being requested: POST https://www.googleapis.com/upload/storage/v1/b/stg-rfid-etl-tmp/o?name=rfid_wh%2Fuq%2Fjp%2Fno_resp_carton_1D%2F2018%2F12%2F24%2F21%2Fno_resp_carton_20181224210201.csv&alt=json&uploadType=media
> [2018-12-26 07:56:33,214] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 [2018-12-26 07:56:33,213] {s3_to_gcs_operator.py:177} INFO - All done, uploaded 1 files to Google Cloud Storage
> [2018-12-26 07:56:33,217] {models.py:1736} ERROR - Object of type 'set' is not JSON serializable
> Traceback (most recent call last)
> File "/usr/local/lib/airflow/airflow/models.py", line 1637, in _run_raw_tas
> self.xcom_push(key=XCOM_RETURN_KEY, value=result
> File "/usr/local/lib/airflow/airflow/models.py", line 1983, in xcom_pus
> execution_date=execution_date or self.execution_date
> File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrappe
> return func(*args, **kwargs
> File "/usr/local/lib/airflow/airflow/models.py", line 4531, in se
> value = json.dumps(value).encode('UTF-8'
> File "/usr/local/lib/python3.6/json/__init__.py", line 231, in dump
> return _default_encoder.encode(obj
> File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encod
> chunks = self.iterencode(o, _one_shot=True
> File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencod
> return _iterencode(o, 0
> File "/usr/local/lib/python3.6/json/encoder.py", line 180, in defaul
> o.__class__.__name__
> TypeError: Object of type 'set' is not JSON serializabl
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 [2018-12-26 07:56:33,217] {models.py:1736} ERROR - Object of type 'set' is not JSON serializable
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 Traceback (most recent call last):
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 File "/usr/local/lib/airflow/airflow/models.py", line 1637, in _run_raw_task
> [2018-12-26 07:56:33,220] {models.py:1756} INFO - Marking task as UP_FOR_RETRY
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 self.xcom_push(key=XCOM_RETURN_KEY, value=result)
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 File "/usr/local/lib/airflow/airflow/models.py", line 1983, in xcom_push
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 execution_date=execution_date or self.execution_date)
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrapper
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 return func(*args, **kwargs)
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 File "/usr/local/lib/airflow/airflow/models.py", line 4531, in set
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 value = json.dumps(value).encode('UTF-8')
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 File "/usr/local/lib/python3.6/json/__init__.py", line 231, in dumps
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 return _default_encoder.encode(obj)
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encode
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 chunks = self.iterencode(o, _one_shot=True)
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencode
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 return _iterencode(o, 0)
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 File "/usr/local/lib/python3.6/json/encoder.py", line 180, in default
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 o.__class__.__name__)
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 TypeError: Object of type 'set' is not JSON serializable
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask gcs_copy_files_from_s3 [2018-12-26 07:56:33,220] {models.py:1756} INFO - Marking task as UP_FOR_RETRY
> {code}
>
> According the error log, it failed because it tries to push return files list as set to xcom but xcom_push does not support set.
> [https://github.com/apache/incubator-airflow/blob/1.10.0/airflow/models.py#L1637]
> {code:java}
> # If the task returns a result, push an XCom containing it
> if result is not None:
> self.xcom_push(key=XCOM_RETURN_KEY, value=result){code}
> [https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/s3_to_gcs_operator.py#L155]
> [https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/s3_to_gcs_operator.py#L198]
> {code:java}
> files = set(files) - set(existing_files)
> ...
> return files{code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)