You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2022/07/11 20:19:09 UTC

[beam] branch master updated: Better error for external BigQuery tables. (#22178)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 983e5c0c372 Better error for external BigQuery tables. (#22178)
983e5c0c372 is described below

commit 983e5c0c372c3724ebb576c4d8f79a3ebd956d56
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Mon Jul 11 13:19:00 2022 -0700

    Better error for external BigQuery tables. (#22178)
---
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java    | 18 +++++++--
 sdks/python/apache_beam/io/gcp/bigquery.py         | 47 +++++++++++++---------
 .../apache_beam/io/gcp/bigquery_read_internal.py   | 47 +++++++++++++---------
 3 files changed, 73 insertions(+), 39 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index de42bc0998f..18beda5c1c6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -211,9 +211,21 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
             .setUseAvroLogicalTypes(useAvroLogicalTypes)
             .setDestinationUris(ImmutableList.of(destinationUri));
 
-    LOG.info("Starting BigQuery extract job: {}", jobId);
-    jobService.startExtractJob(jobRef, extract);
-    Job extractJob = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
+    Job extractJob;
+    try {
+      LOG.info("Starting BigQuery extract job: {}", jobId);
+      jobService.startExtractJob(jobRef, extract);
+      extractJob = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
+    } catch (IOException exn) {
+      // The error messages thrown in this case are generic and misleading, so leave this breadcrumb
+      // in case it's the root cause.
+      LOG.warn(
+          "Error extracting table: {} "
+              + "Note that external tables cannot be exported: "
+              + "https://cloud.google.com/bigquery/docs/external-tables#external_table_limitations",
+          exn);
+      throw exn;
+    }
     if (BigQueryHelpers.parseStatus(extractJob) != Status.SUCCEEDED) {
       throw new IOException(
           String.format(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 5c2c832e3b0..fa28819253b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -986,24 +986,35 @@ class _CustomBigQuerySource(BoundedSource):
     temp_location = self.options.view_as(GoogleCloudOptions).temp_location
     gcs_location = bigquery_export_destination_uri(
         self.gcs_location, temp_location, self._source_uuid)
-    if self.use_json_exports:
-      job_ref = bq.perform_extract_job([gcs_location],
-                                       export_job_name,
-                                       self.table_reference,
-                                       bigquery_tools.FileFormat.JSON,
-                                       project=self._get_project(),
-                                       job_labels=job_labels,
-                                       include_header=False)
-    else:
-      job_ref = bq.perform_extract_job([gcs_location],
-                                       export_job_name,
-                                       self.table_reference,
-                                       bigquery_tools.FileFormat.AVRO,
-                                       project=self._get_project(),
-                                       include_header=False,
-                                       job_labels=job_labels,
-                                       use_avro_logical_types=True)
-    bq.wait_for_bq_job(job_ref)
+    try:
+      if self.use_json_exports:
+        job_ref = bq.perform_extract_job([gcs_location],
+                                         export_job_name,
+                                         self.table_reference,
+                                         bigquery_tools.FileFormat.JSON,
+                                         project=self._get_project(),
+                                         job_labels=job_labels,
+                                         include_header=False)
+      else:
+        job_ref = bq.perform_extract_job([gcs_location],
+                                         export_job_name,
+                                         self.table_reference,
+                                         bigquery_tools.FileFormat.AVRO,
+                                         project=self._get_project(),
+                                         include_header=False,
+                                         job_labels=job_labels,
+                                         use_avro_logical_types=True)
+      bq.wait_for_bq_job(job_ref)
+    except Exception as exn:  # pylint: disable=broad-except
+      # The error messages thrown in this case are generic and misleading,
+      # so leave this breadcrumb in case it's the root cause.
+      logging.warning(
+          "Error exporting table: %s. "
+          "Note that external tables cannot be exported: "
+          "https://cloud.google.com/bigquery/docs/external-tables"
+          "#external_table_limitations",
+          exn)
+      raise
     metadata_list = FileSystems.match([gcs_location])[0].metadata_list
 
     if isinstance(self.table_reference, vp.ValueProvider):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
index 22aacff044a..0ca5c2e69a0 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
@@ -332,24 +332,35 @@ class _BigQueryReadSplit(beam.transforms.DoFn):
         self.gcs_location,
         temp_location,
         '%s%s' % (self._source_uuid, element.obj_id))
-    if self.use_json_exports:
-      job_ref = bq.perform_extract_job([gcs_location],
-                                       export_job_name,
-                                       table_reference,
-                                       bigquery_tools.FileFormat.JSON,
-                                       project=self._get_project(),
-                                       job_labels=job_labels,
-                                       include_header=False)
-    else:
-      job_ref = bq.perform_extract_job([gcs_location],
-                                       export_job_name,
-                                       table_reference,
-                                       bigquery_tools.FileFormat.AVRO,
-                                       project=self._get_project(),
-                                       include_header=False,
-                                       job_labels=job_labels,
-                                       use_avro_logical_types=True)
-    bq.wait_for_bq_job(job_ref)
+    try:
+      if self.use_json_exports:
+        job_ref = bq.perform_extract_job([gcs_location],
+                                         export_job_name,
+                                         table_reference,
+                                         bigquery_tools.FileFormat.JSON,
+                                         project=self._get_project(),
+                                         job_labels=job_labels,
+                                         include_header=False)
+      else:
+        job_ref = bq.perform_extract_job([gcs_location],
+                                         export_job_name,
+                                         table_reference,
+                                         bigquery_tools.FileFormat.AVRO,
+                                         project=self._get_project(),
+                                         include_header=False,
+                                         job_labels=job_labels,
+                                         use_avro_logical_types=True)
+      bq.wait_for_bq_job(job_ref)
+    except Exception as exn:  # pylint: disable=broad-except
+      # The error messages thrown in this case are generic and misleading,
+      # so leave this breadcrumb in case it's the root cause.
+      logging.warning(
+          "Error exporting table: %s. "
+          "Note that external tables cannot be exported: "
+          "https://cloud.google.com/bigquery/docs/external-tables"
+          "#external_table_limitations",
+          exn)
+      raise
     metadata_list = FileSystems.match([gcs_location])[0].metadata_list
 
     if isinstance(table_reference, ValueProvider):