You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/08 18:08:39 UTC

[beam] branch master updated: [BEAM-7173] Avoiding schema autodetection by default in WriteToBigQuery (#8473)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 20badac  [BEAM-7173] Avoiding schema autodetection by default in WriteToBigQuery (#8473)
20badac is described below

commit 20badacf954fa5ffe88b7eb4317797b0b0cc8fd0
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Wed May 8 11:08:28 2019 -0700

    [BEAM-7173] Avoiding schema autodetection by default in WriteToBigQuery (#8473)
    
    * [BEAM-7173] Avoiding schema autodetection by default in WriteToBigQuery
---
 sdks/python/apache_beam/io/gcp/bigquery.py          | 21 ++++++++++++++++++---
 .../apache_beam/io/gcp/bigquery_file_loads.py       |  8 ++------
 sdks/python/apache_beam/io/gcp/bigquery_test.py     |  1 +
 sdks/python/apache_beam/io/gcp/bigquery_tools.py    |  5 +++--
 4 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 59e1126..18df195 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -265,6 +265,7 @@ __all__ = [
     'BigQuerySource',
     'BigQuerySink',
     'WriteToBigQuery',
+    'SCHEMA_AUTODETECT',
     ]
 
 
@@ -786,6 +787,7 @@ class BigQueryWriteFn(DoFn):
                   table_reference, schema)
 
     table_schema = self.get_table_schema(schema)
+
     if table_reference.projectId is None:
       table_reference.projectId = vp.RuntimeValueProvider.get_value(
           'project', str, '')
@@ -880,6 +882,10 @@ class BigQueryWriteFn(DoFn):
                                     (destination, row))) for row in failed_rows]
 
 
+# Flag to be passed to WriteToBigQuery to force schema autodetection
+SCHEMA_AUTODETECT = 'SCHEMA_AUTODETECT'
+
+
 class WriteToBigQuery(PTransform):
   """Write data to BigQuery.
 
@@ -943,8 +949,9 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
         fields, repeated fields, or specifying a BigQuery mode for fields
         (mode will always be set to ``'NULLABLE'``).
         If a callable, then it should receive a destination (in the form of
-        a TableReference or a string, and return a str, dict or TableSchema, and
-        it should return a str, dict or TableSchema.
+        a TableReference or a string, and return a str, dict or TableSchema.
+        One may also pass ``SCHEMA_AUTODETECT`` here, and BigQuery will try to
+        infer the schema for the files that are being loaded.
       create_disposition (BigQueryDisposition): A string describing what
         happens if the table does not exist. Possible values are:
 
@@ -1004,7 +1011,10 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
         create_disposition)
     self.write_disposition = BigQueryDisposition.validate_write(
         write_disposition)
-    self.schema = WriteToBigQuery.get_dict_table_schema(schema)
+    if schema == SCHEMA_AUTODETECT:
+      self.schema = schema
+    else:
+      self.schema = WriteToBigQuery.get_dict_table_schema(schema)
     self.batch_size = batch_size
     self.kms_key = kms_key
     self.test_client = test_client
@@ -1126,6 +1136,11 @@ bigquery_v2_messages.TableSchema):
 
     method_to_use = self._compute_method(p, p.options)
 
+    if (method_to_use == WriteToBigQuery.Method.STREAMING_INSERTS
+        and self.schema == SCHEMA_AUTODETECT):
+      raise ValueError('Schema auto-detection is not supported for streaming '
+                       'inserts into BigQuery. Only for File Loads.')
+
     if method_to_use == WriteToBigQuery.Method.STREAMING_INSERTS:
       # TODO: Support load jobs for streaming pipelines.
       bigquery_write_fn = BigQueryWriteFn(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 807dc62..2af9d9f 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -345,12 +345,8 @@ class TriggerLoadJobs(beam.DoFn):
 
   def display_data(self):
     result = {'create_disposition': str(self.create_disposition),
-              'write_disposition': str(self.write_disposition),
-              'additional_bq_parameters': str(self.additional_bq_parameters)}
-    if self.schema is not None:
-      result['schema'] = str(self.schema)
-    else:
-      result['schema'] = 'AUTODETECT'
+              'write_disposition': str(self.write_disposition)}
+    result['schema'] = str(self.schema)
 
     return result
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 3bd5177..40c5f67 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -550,6 +550,7 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
            | "WriteWithMultipleDests2" >> beam.io.gcp.bigquery.WriteToBigQuery(
                table=value_provider.StaticValueProvider(
                    str, '%s:%s' % (self.project, output_table_2)),
+               schema=beam.io.gcp.bigquery.SCHEMA_AUTODETECT,
                additional_bq_parameters=lambda _: additional_bq_parameters,
                method='FILE_LOADS'))
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 12a4a72..1999838 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -327,6 +327,7 @@ class BigQueryWrapper(object):
                        create_disposition=None,
                        additional_load_parameters=None):
     additional_load_parameters = additional_load_parameters or {}
+    job_schema = None if schema == 'SCHEMA_AUTODETECT' else schema
     reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
     request = bigquery.BigqueryJobsInsertRequest(
         projectId=project_id,
@@ -335,11 +336,11 @@ class BigQueryWrapper(object):
                 load=bigquery.JobConfigurationLoad(
                     sourceUris=source_uris,
                     destinationTable=table_reference,
-                    schema=schema,
+                    schema=job_schema,
                     writeDisposition=write_disposition,
                     createDisposition=create_disposition,
                     sourceFormat='NEWLINE_DELIMITED_JSON',
-                    autodetect=schema is None,
+                    autodetect=schema == 'SCHEMA_AUTODETECT',
                     **additional_load_parameters
                 )
             ),