You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/21 00:40:27 UTC

[GitHub] [airflow] watertree opened a new issue, #28513: GCSToBigQueryOperator no longer loads DATASTORE_BACKUP formats

watertree opened a new issue, #28513:
URL: https://github.com/apache/airflow/issues/28513

   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   ```bash
   airflow@airflow-worker-XXXXXX-XXXXXX:~$ pip freeze | grep google-cloud 
   google-cloud-aiplatform @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_aiplatform-1.16.1-py2.py3-none-any.whl
   google-cloud-appengine-logging @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_appengine_logging-1.1.3-py2.py3-none-any.whl
   google-cloud-audit-log @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_audit_log-0.2.4-py2.py3-none-any.whl
   google-cloud-automl @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_automl-2.8.0-py2.py3-none-any.whl
   google-cloud-bigquery @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_bigquery-2.34.4-py2.py3-none-any.whl
   google-cloud-bigquery-datatransfer @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_bigquery_datatransfer-3.7.0-py2.py3-none-any.whl
   google-cloud-bigquery-storage @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_bigquery_storage-2.14.1-py2.py3-none-any.whl
   google-cloud-bigtable @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_bigtable-1.7.3-py2.py3-none-any.whl
   google-cloud-build @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_build-3.9.0-py2.py3-none-any.whl
   google-cloud-common @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_common-1.0.3-py2.py3-none-any.whl
   google-cloud-compute @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_compute-0.7.0-py2.py3-none-any.whl
   google-cloud-container @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_container-2.11.1-py2.py3-none-any.whl
   google-cloud-core @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_core-2.3.2-py2.py3-none-any.whl
   google-cloud-datacatalog @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_datacatalog-3.9.0-py2.py3-none-any.whl
   google-cloud-datacatalog-lineage @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_datacatalog_lineage-0.1.6-py3-none-any.whl
   google-cloud-datacatalog-lineage-producer-client @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_datacatalog_lineage_producer_client-0.0.9-py3-none-any.whl
   google-cloud-dataform @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_dataform-0.2.0-py2.py3-none-any.whl
   google-cloud-dataplex @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_dataplex-1.1.0-py2.py3-none-any.whl
   google-cloud-dataproc @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_dataproc-5.0.0-py2.py3-none-any.whl
   google-cloud-dataproc-metastore @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_dataproc_metastore-1.6.0-py2.py3-none-any.whl
   google-cloud-datastore @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_datastore-2.8.0-py2.py3-none-any.whl
   google-cloud-dlp @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_dlp-1.0.2-py2.py3-none-any.whl
   google-cloud-filestore @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_filestore-1.2.0-py2.py3-none-any.whl
   google-cloud-firestore @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_firestore-2.5.0-py2.py3-none-any.whl
   google-cloud-kms @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_kms-2.12.0-py2.py3-none-any.whl
   google-cloud-language @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_language-1.3.2-py2.py3-none-any.whl
   google-cloud-logging @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_logging-3.2.1-py2.py3-none-any.whl
   google-cloud-memcache @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_memcache-1.4.1-py2.py3-none-any.whl
   google-cloud-monitoring @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_monitoring-2.11.0-py2.py3-none-any.whl
   google-cloud-orchestration-airflow @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_orchestration_airflow-1.4.1-py2.py3-none-any.whl
   google-cloud-os-login @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_os_login-2.7.1-py2.py3-none-any.whl
   google-cloud-pubsub @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_pubsub-2.13.4-py2.py3-none-any.whl
   google-cloud-pubsublite @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_pubsublite-0.6.1-py2.py3-none-any.whl
   google-cloud-redis @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_redis-2.9.0-py2.py3-none-any.whl
   google-cloud-resource-manager @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_resource_manager-1.6.0-py2.py3-none-any.whl
   google-cloud-secret-manager @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_secret_manager-1.0.2-py2.py3-none-any.whl
   google-cloud-spanner @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_spanner-1.19.3-py2.py3-none-any.whl
   google-cloud-speech @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_speech-1.3.4-py2.py3-none-any.whl
   google-cloud-storage @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_storage-2.6.0-py2.py3-none-any.whl
   google-cloud-tasks @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_tasks-2.10.1-py2.py3-none-any.whl
   google-cloud-texttospeech @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_texttospeech-1.0.3-py2.py3-none-any.whl
   google-cloud-translate @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_translate-1.7.2-py2.py3-none-any.whl
   google-cloud-videointelligence @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_videointelligence-1.16.3-py2.py3-none-any.whl
   google-cloud-vision @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_vision-1.0.2-py2.py3-none-any.whl
   google-cloud-workflows @ file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_workflows-1.7.1-py2.py3-none-any.whl
   ```
   
   ### Apache Airflow version
   
   2.3.4
   
   ### Operating System
   
   Linux
   
   ### Deployment
   
   Composer
   
   ### Deployment details
   
   Cloud Composer: 1.20.2
   Airflow: 2.3.4
   
   ### What happened
   
   GCSToBigQueryOperator was working properly on previous versions of Airflow/Composer, but started failing with encoding errors from erroneous attempts to get schemas.  The error from the test case attached below:
   
   ```
   *** Reading remote log from gs://us-central1-composer-bdcca446-bucket/logs/dag_id=load_datastore_backup_from_gcs_to_bq_bug/run_id=scheduled__2022-12-20T00:00:00+00:00/task_id=load_ds_backup_from_bq/attempt=1.log.
   [2022-12-21, 00:10:03 UTC] {taskinstance.py:1172} INFO - Dependencies all met for <TaskInstance: load_datastore_backup_from_gcs_to_bq_bug.load_ds_backup_from_bq scheduled__2022-12-20T00:00:00+00:00 [queued]>
   [2022-12-21, 00:10:03 UTC] {taskinstance.py:1172} INFO - Dependencies all met for <TaskInstance: load_datastore_backup_from_gcs_to_bq_bug.load_ds_backup_from_bq scheduled__2022-12-20T00:00:00+00:00 [queued]>
   [2022-12-21, 00:10:03 UTC] {taskinstance.py:1369} INFO - 
   --------------------------------------------------------------------------------
   [2022-12-21, 00:10:03 UTC] {taskinstance.py:1370} INFO - Starting attempt 1 of 3
   [2022-12-21, 00:10:03 UTC] {taskinstance.py:1371} INFO - 
   --------------------------------------------------------------------------------
   [2022-12-21, 00:10:03 UTC] {taskinstance.py:1390} INFO - Executing <Task(GCSToBigQueryOperator): load_ds_backup_from_bq> on 2022-12-20 00:00:00+00:00
   [2022-12-21, 00:10:03 UTC] {standard_task_runner.py:52} INFO - Started process 4324 to run task
   [2022-12-21, 00:10:03 UTC] {standard_task_runner.py:79} INFO - Running: ['airflow', 'tasks', 'run', 'load_datastore_backup_from_gcs_to_bq_bug', 'load_ds_backup_from_bq', 'scheduled__2022-12-20T00:00:00+00:00', '--job-id', '55473', '--raw', '--subdir', 'DAGS_FOLDER/gcs_datastore_bq_bug_dag.py', '--cfg-path', '/tmp/tmpum3kky0y', '--error-file', '/tmp/tmpl4da9d_3']
   [2022-12-21, 00:10:03 UTC] {standard_task_runner.py:80} INFO - Job 55473: Subtask load_ds_backup_from_bq
   [2022-12-21, 00:10:04 UTC] {task_command.py:375} INFO - Running <TaskInstance: load_datastore_backup_from_gcs_to_bq_bug.load_ds_backup_from_bq scheduled__2022-12-20T00:00:00+00:00 [running]> on host airflow-worker-594959469f-9dwhs
   [2022-12-21, 00:10:04 UTC] {taskinstance.py:1583} INFO - Exporting the following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=load_datastore_backup_from_gcs_to_bq_bug
   AIRFLOW_CTX_TASK_ID=load_ds_backup_from_bq
   AIRFLOW_CTX_EXECUTION_DATE=2022-12-20T00:00:00+00:00
   AIRFLOW_CTX_TRY_NUMBER=1
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-20T00:00:00+00:00
   [2022-12-21, 00:10:04 UTC] {base.py:68} INFO - Using connection ID 'google_cloud_default' for task execution.
   [2022-12-21, 00:10:04 UTC] {base.py:68} INFO - Using connection ID 'google_cloud_default' for task execution.
   [2022-12-21, 00:10:04 UTC] {gcs_to_bigquery.py:367} INFO - Using existing BigQuery table for storing data...
   [2022-12-21, 00:10:04 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
   [2022-12-21, 00:10:04 UTC] {bigquery.py:2252} INFO - Project is not included in destination_project_dataset_table: ds.boog; using project "*REDACTED_GCP_PROJECT*"
   [2022-12-21, 00:10:05 UTC] {base.py:68} INFO - Using connection ID 'google_cloud_default' for task execution.
   [2022-12-21, 00:10:05 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
   [2022-12-21, 00:10:05 UTC] {taskinstance.py:1904} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py", line 397, in execute
       self.configuration = self._check_schema_fields(self.configuration)
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py", line 542, in _check_schema_fields
       fields, values = [item.split(",") for item in blob.decode("utf-8").splitlines()][:2]
   UnicodeDecodeError: 'utf-8' codec can't decode byte 0xb7 in position 20: invalid start byte
   [2022-12-21, 00:10:05 UTC] {taskinstance.py:1408} INFO - Marking task as UP_FOR_RETRY. dag_id=load_datastore_backup_from_gcs_to_bq_bug, task_id=load_ds_backup_from_bq, execution_date=20221220T000000, start_date=20221221T001003, end_date=20221221T001005
   [2022-12-21, 00:10:05 UTC] {standard_task_runner.py:92} ERROR - Failed to execute job 55473 for task load_ds_backup_from_bq ('utf-8' codec can't decode byte 0xb7 in position 20: invalid start byte; 4324)
   [2022-12-21, 00:10:05 UTC] {local_task_job.py:156} INFO - Task exited with return code 1
   [2022-12-21, 00:10:05 UTC] {local_task_job.py:279} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```
   
   ### What you think should happen instead
   
   Should load table properly (was doing so previously and also manually when using BQ table creation)
   
   ### How to reproduce
   
   Sample below that can be unziped and copied into a cloud storage bucket, assuming everything in an `airflow` directory:
   
   [airflow.zip](https://github.com/apache/airflow/files/10273565/airflow.zip)
   
   Set up a desired bucket to send backup to, for example:
   
   ```bash
   gsutil rsync -r airflow gs://bucket-name-here/airflow
   ```
   Drop following DAG and replace variables marked for replacement:
   
   ```python
   from datetime import timedelta, datetime
   from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
   from airflow.operators.dummy_operator import DummyOperator
   from airflow import DAG
   
   yesterday = datetime.combine(
       datetime.today() - timedelta(1),
       datetime.min.time())
   
   # Replace these values
   BUCKET = 'bucket-name'
   GCP_PROJECT='gcp-project'
   DATASET_TABLE='dataset.table'
   
   default_args = {
       'start_date': yesterday,
       'project_id': GCP_PROJECT
   }
   
   schedule_interval = '@once'
   
   dag = DAG('load_datastore_backup_from_gcs_to_bq_bug',
             default_args=default_args,
             schedule_interval=schedule_interval)
   
   start = DummyOperator(task_id='start', dag=dag)
   end = DummyOperator(task_id='end', dag=dag)
   
   bq_load = GCSToBigQueryOperator(
       task_id='load_ds_backup_from_bq',
       source_format='DATASTORE_BACKUP',
       bucket=BUCKET,
       source_objects=[
           'airflow/namespace_airflow/kind_BugEntity/namespace_airflow_kind_BugEntity.export_metadata'],
       destination_project_dataset_table=DATASET_TABLE,
       write_disposition='WRITE_TRUNCATE',
       create_disposition='CREATE_IF_NEEDED',
       dag=dag
   )
   
   start >> bq_load
   bq_load >> end
   ```
   
   
   ### Anything else
   
   Every time following upgrade.  I believe I was running 2.25 before running a Composer managed upgrade but not sure but several DAGs stopped working as a result of this upgrade.
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #28513: GCSToBigQueryOperator no longer loads DATASTORE_BACKUP formats

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #28513:
URL: https://github.com/apache/airflow/issues/28513#issuecomment-1360586362

   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] watertree commented on issue #28513: GCSToBigQueryOperator no longer loads DATASTORE_BACKUP formats

Posted by GitBox <gi...@apache.org>.
watertree commented on issue #28513:
URL: https://github.com/apache/airflow/issues/28513#issuecomment-1363014661

   This was a regression in version included in airflow 2.3.4.  I pulled the latest version in the `main` branch `23264fb820c179e9951ea9706f68b13a9b3fdbc0`  at the time of test and confirmed that this operator now works properly after copying the file to the worker pods.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] watertree closed issue #28513: GCSToBigQueryOperator no longer loads DATASTORE_BACKUP formats

Posted by GitBox <gi...@apache.org>.
watertree closed issue #28513: GCSToBigQueryOperator no longer loads DATASTORE_BACKUP formats
URL: https://github.com/apache/airflow/issues/28513


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org