You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/26 16:07:03 UTC
[GitHub] [airflow] vchiapaikeo commented on issue #26248: BaseSQLToGCSOperator Parquet Format Fails to Write Dates/JSON
vchiapaikeo commented on issue #26248:
URL: https://github.com/apache/airflow/issues/26248#issuecomment-1365266041
Starting to take a look at this and at least for ints/dates, I can't seem to repro it. Trying w/ the below dag:
```py
from airflow import DAG
from airflow.providers.google.cloud.transfers.postgres_to_gcs import (
PostgresToGCSOperator,
)
DEFAULT_TASK_ARGS = {
"owner": "gcp-data-platform",
"retries": 1,
"retry_delay": 10,
"start_date": "2022-08-01",
}
with DAG(
max_active_runs=1,
concurrency=2,
catchup=False,
schedule_interval="@daily",
dag_id="test_os_postgres_to_gcs",
default_args=DEFAULT_TASK_ARGS,
) as dag:
test_postgres_to_gcs = PostgresToGCSOperator(
task_id="test_postgres_to_gcs",
postgres_conn_id="postgres_default",
sql="""
SELECT id, dag_id, start_date
FROM job
LIMIT 2
""",
export_format="parquet",
gcp_conn_id="google_cloud_default",
bucket="my-bucket",
filename="vchiapaikeo/sql-to-gcs/file.parquet",
)
```
Task logs from the run:
```
*** Reading local file: /root/airflow/logs/dag_id=test_os_postgres_to_gcs/run_id=scheduled__2022-12-25T00:00:00+00:00/task_id=test_postgres_to_gcs/attempt=13.log
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [queued]>
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [queued]>
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1282} INFO -
--------------------------------------------------------------------------------
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1283} INFO - Starting attempt 13 of 14
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1284} INFO -
--------------------------------------------------------------------------------
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1303} INFO - Executing <Task(PostgresToGCSOperator): test_postgres_to_gcs> on 2022-12-25 00:00:00+00:00
[2022-12-26, 16:01:51 UTC] {standard_task_runner.py:55} INFO - Started process 1333 to run task
[2022-12-26, 16:01:51 UTC] {standard_task_runner.py:82} INFO - Running: ['***', 'tasks', 'run', 'test_os_postgres_to_gcs', 'test_postgres_to_gcs', 'scheduled__2022-12-25T00:00:00+00:00', '--job-id', '15', '--raw', '--subdir', 'DAGS_FOLDER/test_postgres.py', '--cfg-path', '/tmp/tmp7276gvee']
[2022-12-26, 16:01:51 UTC] {standard_task_runner.py:83} INFO - Job 15: Subtask test_postgres_to_gcs
[2022-12-26, 16:01:51 UTC] {task_command.py:388} INFO - Running <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [running]> on host d4b52cabcc3d
[2022-12-26, 16:01:52 UTC] {taskinstance.py:1512} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=gcp-data-platform
AIRFLOW_CTX_DAG_ID=test_os_postgres_to_gcs
AIRFLOW_CTX_TASK_ID=test_postgres_to_gcs
AIRFLOW_CTX_EXECUTION_DATE=2022-12-25T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=13
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-25T00:00:00+00:00
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:140} INFO - Executing query
[2022-12-26, 16:01:52 UTC] {base.py:73} INFO - Using connection ID 'postgres_default' for task execution.
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:159} INFO - Writing local data files
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:251} INFO - Logging row: [10, 'test_os_postgres_to_gcs', 1672069731.407203]
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:252} INFO - Logging schema: ['id', 'dag_id', 'start_date']
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:253} INFO - Logging row_pydic: {'id': [10], 'dag_id': ['test_os_postgres_to_gcs'], 'start_date': [1672069731.407203]}
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:254} INFO - Logging parquet_schema: id: int64
dag_id: string
start_date: timestamp[s]
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:251} INFO - Logging row: [11, 'test_os_postgres_to_gcs', 1672069921.269374]
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:252} INFO - Logging schema: ['id', 'dag_id', 'start_date']
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:253} INFO - Logging row_pydic: {'id': [11], 'dag_id': ['test_os_postgres_to_gcs'], 'start_date': [1672069921.269374]}
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:254} INFO - Logging parquet_schema: id: int64
dag_id: string
start_date: timestamp[s]
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:164} INFO - Uploading chunk file #0 to GCS.
[2022-12-26, 16:01:52 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2022-12-26, 16:01:52 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-12-26, 16:01:52 UTC] {_default.py:649} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
[2022-12-26, 16:01:52 UTC] {gcs.py:520} INFO - File /tmp/tmpz75b333l uploaded to vchiapaikeo/sql-to-gcs/file.parquet in my-bucket bucket
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:167} INFO - Removing local file
[2022-12-26, 16:01:53 UTC] {taskinstance.py:1326} INFO - Marking task as SUCCESS. dag_id=test_os_postgres_to_gcs, task_id=test_postgres_to_gcs, execution_date=20221225T000000, start_date=20221226T160151, end_date=20221226T160153
[2022-12-26, 16:01:53 UTC] {local_task_job.py:208} INFO - Task exited with return code 0
[2022-12-26, 16:01:53 UTC] {taskinstance.py:2598} INFO - 0 downstream tasks scheduled from follow-on schedule check
```
And output from file seems fine:
```
vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$ gsutil cp gs://my-bucket/vchiapaikeo/sql-to-gcs/file.parquet .
Copying gs://my-bucket/vchiapaikeo/sql-to-gcs/file.parquet...
/ [1 files][ 2.3 KiB/ 2.3 KiB]
Operation completed over 1 objects/2.3 KiB.
vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$
vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$ pqrs cat file.parquet
##################
File: file.parquet
##################
{id: 10, dag_id: "test_os_postgres_to_gcs", start_date: 2022-12-26 15:48:51 +00:00}
{id: 11, dag_id: "test_os_postgres_to_gcs", start_date: 2022-12-26 15:52:01 +00:00}
```
I do think this repros with the json type though. Here's a log trying to export the serialized_dag table whose data object is of type json:
```
*** Reading local file: /root/airflow/logs/dag_id=test_os_postgres_to_gcs/run_id=scheduled__2022-12-25T00:00:00+00:00/task_id=test_postgres_to_gcs/attempt=16.log
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [queued]>
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [queued]>
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1282} INFO -
--------------------------------------------------------------------------------
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1283} INFO - Starting attempt 16 of 17
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1284} INFO -
--------------------------------------------------------------------------------
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1303} INFO - Executing <Task(PostgresToGCSOperator): test_postgres_to_gcs> on 2022-12-25 00:00:00+00:00
[2022-12-26, 16:04:37 UTC] {standard_task_runner.py:55} INFO - Started process 1433 to run task
[2022-12-26, 16:04:37 UTC] {standard_task_runner.py:82} INFO - Running: ['***', 'tasks', 'run', 'test_os_postgres_to_gcs', 'test_postgres_to_gcs', 'scheduled__2022-12-25T00:00:00+00:00', '--job-id', '18', '--raw', '--subdir', 'DAGS_FOLDER/test_postgres.py', '--cfg-path', '/tmp/tmpi1oep8jw']
[2022-12-26, 16:04:37 UTC] {standard_task_runner.py:83} INFO - Job 18: Subtask test_postgres_to_gcs
[2022-12-26, 16:04:37 UTC] {task_command.py:388} INFO - Running <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [running]> on host d4b52cabcc3d
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1512} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=gcp-data-platform
AIRFLOW_CTX_DAG_ID=test_os_postgres_to_gcs
AIRFLOW_CTX_TASK_ID=test_postgres_to_gcs
AIRFLOW_CTX_EXECUTION_DATE=2022-12-25T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=16
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-25T00:00:00+00:00
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:140} INFO - Executing query
[2022-12-26, 16:04:37 UTC] {base.py:73} INFO - Using connection ID 'postgres_default' for task execution.
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:159} INFO - Writing local data files
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:251} INFO - Logging row: ['test_os_patch_gcs_to_bigquery', '/files/dags/test.py', 28748778209882151, {'__version': 1, 'dag': {'max_active_runs': 1, 'default_args': {'__var': {'owner': 'gcp-data-platform', 'retries': 1, 'retry_delay': 10, 'start_date': {'__var': 1659312000.0, '__type': 'datetime'}}, '__type': 'dict'}, 'edge_info': {}, 'catchup': False, 'timezone': 'UTC', 'fileloc': '/files/dags/test.py', '_task_group': {'_group_id': None, 'prefix_group_id': True, 'tooltip': '', 'ui_color': 'CornflowerBlue', 'ui_fgcolor': '#000', 'children': {'test_gcs_to_bigquery': ['operator', 'test_gcs_to_bigquery']}, 'upstream_group_ids': [], 'downstream_group_ids': [], 'upstream_task_ids': [], 'downstream_task_ids': []}, 'dataset_triggers': [], 'schedule_interval': '@daily', '_max_active_tasks': 2, '_dag_id': 'test_os_patch_gcs_to_bigquery', 'start_date': 1659312000.0, '_processor_dags_folder': '/files/dags', 'tasks': [{'ui_fgcolor': '#000', 'template_e
xt': ['.sql'], 'downstream_task_ids': [], 'retry_delay': 10.0, 'template_fields': ['bucket', 'source_objects', 'schema_object', 'schema_object_bucket', 'destination_project_dataset_table', 'impersonation_chain'], 'retries': 1, 'owner': 'gcp-data-platform', 'pool': 'default_pool', 'task_id': 'test_gcs_to_bigquery', 'ui_color': '#f0eee4', 'template_fields_renderers': {}, '_task_type': 'GCSToBigQueryOperator', '_task_module': '***.providers.google.cloud.transfers.gcs_to_bigquery', '_is_empty': False, '_operator_extra_links': [{'***.providers.google.cloud.links.bigquery.BigQueryTableLink': {}}], 'bucket': 'my-bucket', 'source_objects': ['vchiapaikeo/file.csv'], 'schema_object_bucket': 'my-bucket', 'destination_project_dataset_table': 'my-project.vchiapaikeo.test1'}], 'dag_dependencies': [], 'params': {}}}, None, 1672068095.636378, '276713bbfc190eed20dcb70780b5b47e', None]
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:252} INFO - Logging schema: ['dag_id', 'fileloc', 'fileloc_hash', 'data', 'data_compressed', 'last_updated', 'dag_hash', 'processor_subdir']
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:253} INFO - Logging row_pydic: {'dag_id': ['test_os_patch_gcs_to_bigquery'], 'fileloc': ['/files/dags/test.py'], 'fileloc_hash': [28748778209882151], 'data': [{'__version': 1, 'dag': {'max_active_runs': 1, 'default_args': {'__var': {'owner': 'gcp-data-platform', 'retries': 1, 'retry_delay': 10, 'start_date': {'__var': 1659312000.0, '__type': 'datetime'}}, '__type': 'dict'}, 'edge_info': {}, 'catchup': False, 'timezone': 'UTC', 'fileloc': '/files/dags/test.py', '_task_group': {'_group_id': None, 'prefix_group_id': True, 'tooltip': '', 'ui_color': 'CornflowerBlue', 'ui_fgcolor': '#000', 'children': {'test_gcs_to_bigquery': ['operator', 'test_gcs_to_bigquery']}, 'upstream_group_ids': [], 'downstream_group_ids': [], 'upstream_task_ids': [], 'downstream_task_ids': []}, 'dataset_triggers': [], 'schedule_interval': '@daily', '_max_active_tasks': 2, '_dag_id': 'test_os_patch_gcs_to_bigquery', 'start_date': 1659312000.0, '_processor_dags_folder': '
/files/dags', 'tasks': [{'ui_fgcolor': '#000', 'template_ext': ['.sql'], 'downstream_task_ids': [], 'retry_delay': 10.0, 'template_fields': ['bucket', 'source_objects', 'schema_object', 'schema_object_bucket', 'destination_project_dataset_table', 'impersonation_chain'], 'retries': 1, 'owner': 'gcp-data-platform', 'pool': 'default_pool', 'task_id': 'test_gcs_to_bigquery', 'ui_color': '#f0eee4', 'template_fields_renderers': {}, '_task_type': 'GCSToBigQueryOperator', '_task_module': '***.providers.google.cloud.transfers.gcs_to_bigquery', '_is_empty': False, '_operator_extra_links': [{'***.providers.google.cloud.links.bigquery.BigQueryTableLink': {}}], 'bucket': 'my-bucket', 'source_objects': ['vchiapaikeo/file.csv'], 'schema_object_bucket': 'my-bucket', 'destination_project_dataset_table': 'my-project.vchiapaikeo.test1'}], 'dag_dependencies': [], 'params': {}}}], 'data_compressed': [None], 'last_updated': [1672068095.636378], 'dag_hash': ['276713bbfc190eed20dcb70780b5b47e'], 'processor
_subdir': [None]}
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:254} INFO - Logging parquet_schema: dag_id: string
fileloc: string
fileloc_hash: int64
data: string
data_compressed: string
last_updated: timestamp[s]
dag_hash: string
processor_subdir: string
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1782} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/airflow/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 160, in execute
for file_to_upload in self._write_local_data_files(cursor):
File "/opt/airflow/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 255, in _write_local_data_files
tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
File "pyarrow/table.pxi", line 3648, in pyarrow.lib.Table.from_pydict
File "pyarrow/table.pxi", line 5191, in pyarrow.lib._from_pydict
File "pyarrow/array.pxi", line 343, in pyarrow.lib.asarray
File "pyarrow/array.pxi", line 317, in pyarrow.lib.array
File "pyarrow/array.pxi", line 39, in pyarrow.lib._sequence_to_array
File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 123, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: Expected bytes, got a 'dict' object
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1326} INFO - Marking task as UP_FOR_RETRY. dag_id=test_os_postgres_to_gcs, task_id=test_postgres_to_gcs, execution_date=20221225T000000, start_date=20221226T160437, end_date=20221226T160437
[2022-12-26, 16:04:37 UTC] {standard_task_runner.py:105} ERROR - Failed to execute job 18 for task test_postgres_to_gcs (Expected bytes, got a 'dict' object; 1433)
[2022-12-26, 16:04:37 UTC] {local_task_job.py:208} INFO - Task exited with return code 1
[2022-12-26, 16:04:37 UTC] {taskinstance.py:2598} INFO - 0 downstream tasks scheduled from follow-on schedule check
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org