You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2022/07/11 20:19:09 UTC
[beam] branch master updated: Better error for external BigQuery tables. (#22178)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 983e5c0c372 Better error for external BigQuery tables. (#22178)
983e5c0c372 is described below
commit 983e5c0c372c3724ebb576c4d8f79a3ebd956d56
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Mon Jul 11 13:19:00 2022 -0700
Better error for external BigQuery tables. (#22178)
---
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 18 +++++++--
sdks/python/apache_beam/io/gcp/bigquery.py | 47 +++++++++++++---------
.../apache_beam/io/gcp/bigquery_read_internal.py | 47 +++++++++++++---------
3 files changed, 73 insertions(+), 39 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index de42bc0998f..18beda5c1c6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -211,9 +211,21 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
.setUseAvroLogicalTypes(useAvroLogicalTypes)
.setDestinationUris(ImmutableList.of(destinationUri));
- LOG.info("Starting BigQuery extract job: {}", jobId);
- jobService.startExtractJob(jobRef, extract);
- Job extractJob = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
+ Job extractJob;
+ try {
+ LOG.info("Starting BigQuery extract job: {}", jobId);
+ jobService.startExtractJob(jobRef, extract);
+ extractJob = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
+ } catch (IOException exn) {
+ // The error messages thrown in this case are generic and misleading, so leave this breadcrumb
+ // in case it's the root cause.
+ LOG.warn(
+ "Error extracting table: {} "
+ + "Note that external tables cannot be exported: "
+ + "https://cloud.google.com/bigquery/docs/external-tables#external_table_limitations",
+ exn);
+ throw exn;
+ }
if (BigQueryHelpers.parseStatus(extractJob) != Status.SUCCEEDED) {
throw new IOException(
String.format(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 5c2c832e3b0..fa28819253b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -986,24 +986,35 @@ class _CustomBigQuerySource(BoundedSource):
temp_location = self.options.view_as(GoogleCloudOptions).temp_location
gcs_location = bigquery_export_destination_uri(
self.gcs_location, temp_location, self._source_uuid)
- if self.use_json_exports:
- job_ref = bq.perform_extract_job([gcs_location],
- export_job_name,
- self.table_reference,
- bigquery_tools.FileFormat.JSON,
- project=self._get_project(),
- job_labels=job_labels,
- include_header=False)
- else:
- job_ref = bq.perform_extract_job([gcs_location],
- export_job_name,
- self.table_reference,
- bigquery_tools.FileFormat.AVRO,
- project=self._get_project(),
- include_header=False,
- job_labels=job_labels,
- use_avro_logical_types=True)
- bq.wait_for_bq_job(job_ref)
+ try:
+ if self.use_json_exports:
+ job_ref = bq.perform_extract_job([gcs_location],
+ export_job_name,
+ self.table_reference,
+ bigquery_tools.FileFormat.JSON,
+ project=self._get_project(),
+ job_labels=job_labels,
+ include_header=False)
+ else:
+ job_ref = bq.perform_extract_job([gcs_location],
+ export_job_name,
+ self.table_reference,
+ bigquery_tools.FileFormat.AVRO,
+ project=self._get_project(),
+ include_header=False,
+ job_labels=job_labels,
+ use_avro_logical_types=True)
+ bq.wait_for_bq_job(job_ref)
+ except Exception as exn: # pylint: disable=broad-except
+ # The error messages thrown in this case are generic and misleading,
+ # so leave this breadcrumb in case it's the root cause.
+ logging.warning(
+ "Error exporting table: %s. "
+ "Note that external tables cannot be exported: "
+ "https://cloud.google.com/bigquery/docs/external-tables"
+ "#external_table_limitations",
+ exn)
+ raise
metadata_list = FileSystems.match([gcs_location])[0].metadata_list
if isinstance(self.table_reference, vp.ValueProvider):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
index 22aacff044a..0ca5c2e69a0 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
@@ -332,24 +332,35 @@ class _BigQueryReadSplit(beam.transforms.DoFn):
self.gcs_location,
temp_location,
'%s%s' % (self._source_uuid, element.obj_id))
- if self.use_json_exports:
- job_ref = bq.perform_extract_job([gcs_location],
- export_job_name,
- table_reference,
- bigquery_tools.FileFormat.JSON,
- project=self._get_project(),
- job_labels=job_labels,
- include_header=False)
- else:
- job_ref = bq.perform_extract_job([gcs_location],
- export_job_name,
- table_reference,
- bigquery_tools.FileFormat.AVRO,
- project=self._get_project(),
- include_header=False,
- job_labels=job_labels,
- use_avro_logical_types=True)
- bq.wait_for_bq_job(job_ref)
+ try:
+ if self.use_json_exports:
+ job_ref = bq.perform_extract_job([gcs_location],
+ export_job_name,
+ table_reference,
+ bigquery_tools.FileFormat.JSON,
+ project=self._get_project(),
+ job_labels=job_labels,
+ include_header=False)
+ else:
+ job_ref = bq.perform_extract_job([gcs_location],
+ export_job_name,
+ table_reference,
+ bigquery_tools.FileFormat.AVRO,
+ project=self._get_project(),
+ include_header=False,
+ job_labels=job_labels,
+ use_avro_logical_types=True)
+ bq.wait_for_bq_job(job_ref)
+ except Exception as exn: # pylint: disable=broad-except
+ # The error messages thrown in this case are generic and misleading,
+ # so leave this breadcrumb in case it's the root cause.
+ logging.warning(
+ "Error exporting table: %s. "
+ "Note that external tables cannot be exported: "
+ "https://cloud.google.com/bigquery/docs/external-tables"
+ "#external_table_limitations",
+ exn)
+ raise
metadata_list = FileSystems.match([gcs_location])[0].metadata_list
if isinstance(table_reference, ValueProvider):