You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/26 16:07:03 UTC

[GitHub] [airflow] vchiapaikeo commented on issue #26248: BaseSQLToGCSOperator Parquet Format Fails to Write Dates/JSON

vchiapaikeo commented on issue #26248:
URL: https://github.com/apache/airflow/issues/26248#issuecomment-1365266041

   Starting to take a look at this and at least for ints/dates, I can't seem to repro it. Trying w/ the below dag:
   
   ```py
   from airflow import DAG
   
   from airflow.providers.google.cloud.transfers.postgres_to_gcs import (
       PostgresToGCSOperator,
   )
   
   DEFAULT_TASK_ARGS = {
       "owner": "gcp-data-platform",
       "retries": 1,
       "retry_delay": 10,
       "start_date": "2022-08-01",
   }
   
   with DAG(
       max_active_runs=1,
       concurrency=2,
       catchup=False,
       schedule_interval="@daily",
       dag_id="test_os_postgres_to_gcs",
       default_args=DEFAULT_TASK_ARGS,
   ) as dag:
       
       test_postgres_to_gcs = PostgresToGCSOperator(
           task_id="test_postgres_to_gcs",
           postgres_conn_id="postgres_default",
           sql="""
           SELECT id, dag_id, start_date
           FROM job
           LIMIT 2
           """,
           export_format="parquet",
           gcp_conn_id="google_cloud_default",
           bucket="my-bucket",
           filename="vchiapaikeo/sql-to-gcs/file.parquet",
       )
   ```
   
   Task logs from the run:
   
   ```
   *** Reading local file: /root/airflow/logs/dag_id=test_os_postgres_to_gcs/run_id=scheduled__2022-12-25T00:00:00+00:00/task_id=test_postgres_to_gcs/attempt=13.log
   [2022-12-26, 16:01:51 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [queued]>
   [2022-12-26, 16:01:51 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [queued]>
   [2022-12-26, 16:01:51 UTC] {taskinstance.py:1282} INFO - 
   --------------------------------------------------------------------------------
   [2022-12-26, 16:01:51 UTC] {taskinstance.py:1283} INFO - Starting attempt 13 of 14
   [2022-12-26, 16:01:51 UTC] {taskinstance.py:1284} INFO - 
   --------------------------------------------------------------------------------
   [2022-12-26, 16:01:51 UTC] {taskinstance.py:1303} INFO - Executing <Task(PostgresToGCSOperator): test_postgres_to_gcs> on 2022-12-25 00:00:00+00:00
   [2022-12-26, 16:01:51 UTC] {standard_task_runner.py:55} INFO - Started process 1333 to run task
   [2022-12-26, 16:01:51 UTC] {standard_task_runner.py:82} INFO - Running: ['***', 'tasks', 'run', 'test_os_postgres_to_gcs', 'test_postgres_to_gcs', 'scheduled__2022-12-25T00:00:00+00:00', '--job-id', '15', '--raw', '--subdir', 'DAGS_FOLDER/test_postgres.py', '--cfg-path', '/tmp/tmp7276gvee']
   [2022-12-26, 16:01:51 UTC] {standard_task_runner.py:83} INFO - Job 15: Subtask test_postgres_to_gcs
   [2022-12-26, 16:01:51 UTC] {task_command.py:388} INFO - Running <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [running]> on host d4b52cabcc3d
   [2022-12-26, 16:01:52 UTC] {taskinstance.py:1512} INFO - Exporting the following env vars:
   AIRFLOW_CTX_DAG_OWNER=gcp-data-platform
   AIRFLOW_CTX_DAG_ID=test_os_postgres_to_gcs
   AIRFLOW_CTX_TASK_ID=test_postgres_to_gcs
   AIRFLOW_CTX_EXECUTION_DATE=2022-12-25T00:00:00+00:00
   AIRFLOW_CTX_TRY_NUMBER=13
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-25T00:00:00+00:00
   [2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:140} INFO - Executing query
   [2022-12-26, 16:01:52 UTC] {base.py:73} INFO - Using connection ID 'postgres_default' for task execution.
   [2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:159} INFO - Writing local data files
   [2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:251} INFO - Logging row: [10, 'test_os_postgres_to_gcs', 1672069731.407203]
   [2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:252} INFO - Logging schema: ['id', 'dag_id', 'start_date']
   [2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:253} INFO - Logging row_pydic: {'id': [10], 'dag_id': ['test_os_postgres_to_gcs'], 'start_date': [1672069731.407203]}
   [2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:254} INFO - Logging parquet_schema: id: int64
   dag_id: string
   start_date: timestamp[s]
   [2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:251} INFO - Logging row: [11, 'test_os_postgres_to_gcs', 1672069921.269374]
   [2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:252} INFO - Logging schema: ['id', 'dag_id', 'start_date']
   [2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:253} INFO - Logging row_pydic: {'id': [11], 'dag_id': ['test_os_postgres_to_gcs'], 'start_date': [1672069921.269374]}
   [2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:254} INFO - Logging parquet_schema: id: int64
   dag_id: string
   start_date: timestamp[s]
   [2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:164} INFO - Uploading chunk file #0 to GCS.
   [2022-12-26, 16:01:52 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
   [2022-12-26, 16:01:52 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
   [2022-12-26, 16:01:52 UTC] {_default.py:649} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
   [2022-12-26, 16:01:52 UTC] {gcs.py:520} INFO - File /tmp/tmpz75b333l uploaded to vchiapaikeo/sql-to-gcs/file.parquet in my-bucket bucket
   [2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:167} INFO - Removing local file
   [2022-12-26, 16:01:53 UTC] {taskinstance.py:1326} INFO - Marking task as SUCCESS. dag_id=test_os_postgres_to_gcs, task_id=test_postgres_to_gcs, execution_date=20221225T000000, start_date=20221226T160151, end_date=20221226T160153
   [2022-12-26, 16:01:53 UTC] {local_task_job.py:208} INFO - Task exited with return code 0
   [2022-12-26, 16:01:53 UTC] {taskinstance.py:2598} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```
   
   And output from file seems fine:
   
   ```
   vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$ gsutil cp gs://my-bucket/vchiapaikeo/sql-to-gcs/file.parquet .
   Copying gs://my-bucket/vchiapaikeo/sql-to-gcs/file.parquet...
   / [1 files][  2.3 KiB/  2.3 KiB]                                                
   Operation completed over 1 objects/2.3 KiB.                                      
   
   vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$ 
   vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$ pqrs cat file.parquet
   
   ##################
   File: file.parquet
   ##################
   
   {id: 10, dag_id: "test_os_postgres_to_gcs", start_date: 2022-12-26 15:48:51 +00:00}
   {id: 11, dag_id: "test_os_postgres_to_gcs", start_date: 2022-12-26 15:52:01 +00:00}
   ```
   
   I do think this repros with the json type though. Here's a log trying to export the serialized_dag table whose data object is of type json:
   
   ```
   *** Reading local file: /root/airflow/logs/dag_id=test_os_postgres_to_gcs/run_id=scheduled__2022-12-25T00:00:00+00:00/task_id=test_postgres_to_gcs/attempt=16.log
   [2022-12-26, 16:04:37 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [queued]>
   [2022-12-26, 16:04:37 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [queued]>
   [2022-12-26, 16:04:37 UTC] {taskinstance.py:1282} INFO - 
   --------------------------------------------------------------------------------
   [2022-12-26, 16:04:37 UTC] {taskinstance.py:1283} INFO - Starting attempt 16 of 17
   [2022-12-26, 16:04:37 UTC] {taskinstance.py:1284} INFO - 
   --------------------------------------------------------------------------------
   [2022-12-26, 16:04:37 UTC] {taskinstance.py:1303} INFO - Executing <Task(PostgresToGCSOperator): test_postgres_to_gcs> on 2022-12-25 00:00:00+00:00
   [2022-12-26, 16:04:37 UTC] {standard_task_runner.py:55} INFO - Started process 1433 to run task
   [2022-12-26, 16:04:37 UTC] {standard_task_runner.py:82} INFO - Running: ['***', 'tasks', 'run', 'test_os_postgres_to_gcs', 'test_postgres_to_gcs', 'scheduled__2022-12-25T00:00:00+00:00', '--job-id', '18', '--raw', '--subdir', 'DAGS_FOLDER/test_postgres.py', '--cfg-path', '/tmp/tmpi1oep8jw']
   [2022-12-26, 16:04:37 UTC] {standard_task_runner.py:83} INFO - Job 18: Subtask test_postgres_to_gcs
   [2022-12-26, 16:04:37 UTC] {task_command.py:388} INFO - Running <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [running]> on host d4b52cabcc3d
   [2022-12-26, 16:04:37 UTC] {taskinstance.py:1512} INFO - Exporting the following env vars:
   AIRFLOW_CTX_DAG_OWNER=gcp-data-platform
   AIRFLOW_CTX_DAG_ID=test_os_postgres_to_gcs
   AIRFLOW_CTX_TASK_ID=test_postgres_to_gcs
   AIRFLOW_CTX_EXECUTION_DATE=2022-12-25T00:00:00+00:00
   AIRFLOW_CTX_TRY_NUMBER=16
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-25T00:00:00+00:00
   [2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:140} INFO - Executing query
   [2022-12-26, 16:04:37 UTC] {base.py:73} INFO - Using connection ID 'postgres_default' for task execution.
   [2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:159} INFO - Writing local data files
   [2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:251} INFO - Logging row: ['test_os_patch_gcs_to_bigquery', '/files/dags/test.py', 28748778209882151, {'__version': 1, 'dag': {'max_active_runs': 1, 'default_args': {'__var': {'owner': 'gcp-data-platform', 'retries': 1, 'retry_delay': 10, 'start_date': {'__var': 1659312000.0, '__type': 'datetime'}}, '__type': 'dict'}, 'edge_info': {}, 'catchup': False, 'timezone': 'UTC', 'fileloc': '/files/dags/test.py', '_task_group': {'_group_id': None, 'prefix_group_id': True, 'tooltip': '', 'ui_color': 'CornflowerBlue', 'ui_fgcolor': '#000', 'children': {'test_gcs_to_bigquery': ['operator', 'test_gcs_to_bigquery']}, 'upstream_group_ids': [], 'downstream_group_ids': [], 'upstream_task_ids': [], 'downstream_task_ids': []}, 'dataset_triggers': [], 'schedule_interval': '@daily', '_max_active_tasks': 2, '_dag_id': 'test_os_patch_gcs_to_bigquery', 'start_date': 1659312000.0, '_processor_dags_folder': '/files/dags', 'tasks': [{'ui_fgcolor': '#000', 'template_e
 xt': ['.sql'], 'downstream_task_ids': [], 'retry_delay': 10.0, 'template_fields': ['bucket', 'source_objects', 'schema_object', 'schema_object_bucket', 'destination_project_dataset_table', 'impersonation_chain'], 'retries': 1, 'owner': 'gcp-data-platform', 'pool': 'default_pool', 'task_id': 'test_gcs_to_bigquery', 'ui_color': '#f0eee4', 'template_fields_renderers': {}, '_task_type': 'GCSToBigQueryOperator', '_task_module': '***.providers.google.cloud.transfers.gcs_to_bigquery', '_is_empty': False, '_operator_extra_links': [{'***.providers.google.cloud.links.bigquery.BigQueryTableLink': {}}], 'bucket': 'my-bucket', 'source_objects': ['vchiapaikeo/file.csv'], 'schema_object_bucket': 'my-bucket', 'destination_project_dataset_table': 'my-project.vchiapaikeo.test1'}], 'dag_dependencies': [], 'params': {}}}, None, 1672068095.636378, '276713bbfc190eed20dcb70780b5b47e', None]
   [2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:252} INFO - Logging schema: ['dag_id', 'fileloc', 'fileloc_hash', 'data', 'data_compressed', 'last_updated', 'dag_hash', 'processor_subdir']
   [2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:253} INFO - Logging row_pydic: {'dag_id': ['test_os_patch_gcs_to_bigquery'], 'fileloc': ['/files/dags/test.py'], 'fileloc_hash': [28748778209882151], 'data': [{'__version': 1, 'dag': {'max_active_runs': 1, 'default_args': {'__var': {'owner': 'gcp-data-platform', 'retries': 1, 'retry_delay': 10, 'start_date': {'__var': 1659312000.0, '__type': 'datetime'}}, '__type': 'dict'}, 'edge_info': {}, 'catchup': False, 'timezone': 'UTC', 'fileloc': '/files/dags/test.py', '_task_group': {'_group_id': None, 'prefix_group_id': True, 'tooltip': '', 'ui_color': 'CornflowerBlue', 'ui_fgcolor': '#000', 'children': {'test_gcs_to_bigquery': ['operator', 'test_gcs_to_bigquery']}, 'upstream_group_ids': [], 'downstream_group_ids': [], 'upstream_task_ids': [], 'downstream_task_ids': []}, 'dataset_triggers': [], 'schedule_interval': '@daily', '_max_active_tasks': 2, '_dag_id': 'test_os_patch_gcs_to_bigquery', 'start_date': 1659312000.0, '_processor_dags_folder': '
 /files/dags', 'tasks': [{'ui_fgcolor': '#000', 'template_ext': ['.sql'], 'downstream_task_ids': [], 'retry_delay': 10.0, 'template_fields': ['bucket', 'source_objects', 'schema_object', 'schema_object_bucket', 'destination_project_dataset_table', 'impersonation_chain'], 'retries': 1, 'owner': 'gcp-data-platform', 'pool': 'default_pool', 'task_id': 'test_gcs_to_bigquery', 'ui_color': '#f0eee4', 'template_fields_renderers': {}, '_task_type': 'GCSToBigQueryOperator', '_task_module': '***.providers.google.cloud.transfers.gcs_to_bigquery', '_is_empty': False, '_operator_extra_links': [{'***.providers.google.cloud.links.bigquery.BigQueryTableLink': {}}], 'bucket': 'my-bucket', 'source_objects': ['vchiapaikeo/file.csv'], 'schema_object_bucket': 'my-bucket', 'destination_project_dataset_table': 'my-project.vchiapaikeo.test1'}], 'dag_dependencies': [], 'params': {}}}], 'data_compressed': [None], 'last_updated': [1672068095.636378], 'dag_hash': ['276713bbfc190eed20dcb70780b5b47e'], 'processor
 _subdir': [None]}
   [2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:254} INFO - Logging parquet_schema: dag_id: string
   fileloc: string
   fileloc_hash: int64
   data: string
   data_compressed: string
   last_updated: timestamp[s]
   dag_hash: string
   processor_subdir: string
   [2022-12-26, 16:04:37 UTC] {taskinstance.py:1782} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/opt/airflow/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 160, in execute
       for file_to_upload in self._write_local_data_files(cursor):
     File "/opt/airflow/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 255, in _write_local_data_files
       tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
     File "pyarrow/table.pxi", line 3648, in pyarrow.lib.Table.from_pydict
     File "pyarrow/table.pxi", line 5191, in pyarrow.lib._from_pydict
     File "pyarrow/array.pxi", line 343, in pyarrow.lib.asarray
     File "pyarrow/array.pxi", line 317, in pyarrow.lib.array
     File "pyarrow/array.pxi", line 39, in pyarrow.lib._sequence_to_array
     File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
     File "pyarrow/error.pxi", line 123, in pyarrow.lib.check_status
   pyarrow.lib.ArrowTypeError: Expected bytes, got a 'dict' object
   [2022-12-26, 16:04:37 UTC] {taskinstance.py:1326} INFO - Marking task as UP_FOR_RETRY. dag_id=test_os_postgres_to_gcs, task_id=test_postgres_to_gcs, execution_date=20221225T000000, start_date=20221226T160437, end_date=20221226T160437
   [2022-12-26, 16:04:37 UTC] {standard_task_runner.py:105} ERROR - Failed to execute job 18 for task test_postgres_to_gcs (Expected bytes, got a 'dict' object; 1433)
   [2022-12-26, 16:04:37 UTC] {local_task_job.py:208} INFO - Task exited with return code 1
   [2022-12-26, 16:04:37 UTC] {taskinstance.py:2598} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org