You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/08 18:08:39 UTC
[beam] branch master updated: [BEAM-7173] Avoiding schema
autodetection by default in WriteToBigQuery (#8473)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 20badac [BEAM-7173] Avoiding schema autodetection by default in WriteToBigQuery (#8473)
20badac is described below
commit 20badacf954fa5ffe88b7eb4317797b0b0cc8fd0
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Wed May 8 11:08:28 2019 -0700
[BEAM-7173] Avoiding schema autodetection by default in WriteToBigQuery (#8473)
* [BEAM-7173] Avoiding schema autodetection by default in WriteToBigQuery
---
sdks/python/apache_beam/io/gcp/bigquery.py | 21 ++++++++++++++++++---
.../apache_beam/io/gcp/bigquery_file_loads.py | 8 ++------
sdks/python/apache_beam/io/gcp/bigquery_test.py | 1 +
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 5 +++--
4 files changed, 24 insertions(+), 11 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 59e1126..18df195 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -265,6 +265,7 @@ __all__ = [
'BigQuerySource',
'BigQuerySink',
'WriteToBigQuery',
+ 'SCHEMA_AUTODETECT',
]
@@ -786,6 +787,7 @@ class BigQueryWriteFn(DoFn):
table_reference, schema)
table_schema = self.get_table_schema(schema)
+
if table_reference.projectId is None:
table_reference.projectId = vp.RuntimeValueProvider.get_value(
'project', str, '')
@@ -880,6 +882,10 @@ class BigQueryWriteFn(DoFn):
(destination, row))) for row in failed_rows]
+# Flag to be passed to WriteToBigQuery to force schema autodetection
+SCHEMA_AUTODETECT = 'SCHEMA_AUTODETECT'
+
+
class WriteToBigQuery(PTransform):
"""Write data to BigQuery.
@@ -943,8 +949,9 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
fields, repeated fields, or specifying a BigQuery mode for fields
(mode will always be set to ``'NULLABLE'``).
If a callable, then it should receive a destination (in the form of
- a TableReference or a string, and return a str, dict or TableSchema, and
- it should return a str, dict or TableSchema.
+ a TableReference or a string, and return a str, dict or TableSchema.
+ One may also pass ``SCHEMA_AUTODETECT`` here, and BigQuery will try to
+ infer the schema for the files that are being loaded.
create_disposition (BigQueryDisposition): A string describing what
happens if the table does not exist. Possible values are:
@@ -1004,7 +1011,10 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
create_disposition)
self.write_disposition = BigQueryDisposition.validate_write(
write_disposition)
- self.schema = WriteToBigQuery.get_dict_table_schema(schema)
+ if schema == SCHEMA_AUTODETECT:
+ self.schema = schema
+ else:
+ self.schema = WriteToBigQuery.get_dict_table_schema(schema)
self.batch_size = batch_size
self.kms_key = kms_key
self.test_client = test_client
@@ -1126,6 +1136,11 @@ bigquery_v2_messages.TableSchema):
method_to_use = self._compute_method(p, p.options)
+ if (method_to_use == WriteToBigQuery.Method.STREAMING_INSERTS
+ and self.schema == SCHEMA_AUTODETECT):
+ raise ValueError('Schema auto-detection is not supported for streaming '
+ 'inserts into BigQuery. Only for File Loads.')
+
if method_to_use == WriteToBigQuery.Method.STREAMING_INSERTS:
# TODO: Support load jobs for streaming pipelines.
bigquery_write_fn = BigQueryWriteFn(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 807dc62..2af9d9f 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -345,12 +345,8 @@ class TriggerLoadJobs(beam.DoFn):
def display_data(self):
result = {'create_disposition': str(self.create_disposition),
- 'write_disposition': str(self.write_disposition),
- 'additional_bq_parameters': str(self.additional_bq_parameters)}
- if self.schema is not None:
- result['schema'] = str(self.schema)
- else:
- result['schema'] = 'AUTODETECT'
+ 'write_disposition': str(self.write_disposition)}
+ result['schema'] = str(self.schema)
return result
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 3bd5177..40c5f67 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -550,6 +550,7 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
| "WriteWithMultipleDests2" >> beam.io.gcp.bigquery.WriteToBigQuery(
table=value_provider.StaticValueProvider(
str, '%s:%s' % (self.project, output_table_2)),
+ schema=beam.io.gcp.bigquery.SCHEMA_AUTODETECT,
additional_bq_parameters=lambda _: additional_bq_parameters,
method='FILE_LOADS'))
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 12a4a72..1999838 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -327,6 +327,7 @@ class BigQueryWrapper(object):
create_disposition=None,
additional_load_parameters=None):
additional_load_parameters = additional_load_parameters or {}
+ job_schema = None if schema == 'SCHEMA_AUTODETECT' else schema
reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
request = bigquery.BigqueryJobsInsertRequest(
projectId=project_id,
@@ -335,11 +336,11 @@ class BigQueryWrapper(object):
load=bigquery.JobConfigurationLoad(
sourceUris=source_uris,
destinationTable=table_reference,
- schema=schema,
+ schema=job_schema,
writeDisposition=write_disposition,
createDisposition=create_disposition,
sourceFormat='NEWLINE_DELIMITED_JSON',
- autodetect=schema is None,
+ autodetect=schema == 'SCHEMA_AUTODETECT',
**additional_load_parameters
)
),