You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/03/06 01:00:32 UTC
[beam] branch master updated: Use Avro format for file loads to
BigQuery
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0cc5771 Use Avro format for file loads to BigQuery
new 9a3ba93 Merge pull request #10979 from [BEAM-8841] Support writing data to BigQuery via Avro in Python SDK
0cc5771 is described below
commit 0cc577134b9e98b32020efab2ca07ccd1665f7cb
Author: Chuck Yang <ch...@getcruise.com>
AuthorDate: Mon Feb 24 16:17:31 2020 -0800
Use Avro format for file loads to BigQuery
This commit adds the ability to `WriteToBigQuery` via Avro file loads
and sets the Avro format as the default format for file loads
into BigQuery.
Changes include:
* Add a `temp_file_format` option to `WriteToBigQuery` and
`BigQueryBatchFileLoads` to select which file format to use for
loading data.
* Set the default `temp_file_format` to Avro.
* Move implementation of `get_table_schema_from_string`,
`table_schema_to_dict`, and `get_dict_table_schema` to
the `bigquery_tools` module.
* Add `bigquery_avro_tools` module with utilities for converting
BigQuery `TableSchema` to Avro `RecordSchema` (this is a port of
what's available in the Java SDK, with modified behavior for
logical types).
* Modify `WriteRecordsToFile` and `WriteGroupedRecordsToFile` to accept
`schema` and `file format`, since in order to be read by BigQuery,
the Avro files must have schema headers.
* Parameterize relevant tests to check both JSON and Avro code paths.
* Add integration test using Avro-based file loads.
* Introduce `JsonRowWriter` and `AvroRowWriter` classes which implement
the `io.IOBase` interface and are used by `WriteRecordsToFile`
and `WriteGroupedRecordsToFile`.
---
CHANGES.md | 2 +-
sdks/python/apache_beam/io/gcp/bigquery.py | 125 ++++--------
.../apache_beam/io/gcp/bigquery_avro_tools.py | 132 +++++++++++++
.../apache_beam/io/gcp/bigquery_avro_tools_test.py | 181 ++++++++++++++++++
.../apache_beam/io/gcp/bigquery_file_loads.py | 91 ++++++---
.../apache_beam/io/gcp/bigquery_file_loads_test.py | 129 ++++++++-----
sdks/python/apache_beam/io/gcp/bigquery_test.py | 125 +++++++++++-
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 211 ++++++++++++++++++++-
.../apache_beam/io/gcp/bigquery_tools_test.py | 67 +++++++
.../apache_beam/io/gcp/bigquery_write_it_test.py | 7 +-
sdks/python/apache_beam/io/localfilesystem.py | 3 +-
sdks/python/scripts/generate_pydoc.sh | 1 +
sdks/python/scripts/run_integration_test.sh | 1 +
13 files changed, 902 insertions(+), 173 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 44fda85..8e37348 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -38,7 +38,7 @@
* New AnnotateVideo & AnnotateVideoWithContext PTransform's that integrates GCP Video Intelligence functionality. (Python) ([BEAM-9146](https://issues.apache.org/jira/browse/BEAM-9146))
* New AnnotateImage & AnnotateImageWithContext PTransform's for element-wise & batch image annotation using Google Cloud Vision API. (Python) ([BEAM-9247](https://issues.apache.org/jira/browse/BEAM-9247))
* Added a PTransform for inspection and deidentification of text using Google Cloud DLP. (Python) ([BEAM-9258](https://issues.apache.org/jira/browse/BEAM-9258))
-* New AnnotateText PTransform that integrates Google Cloud Natural Language functionality (Python) ([BEAM-9248](https://issues.apache.org/jira/browse/BEAM-9248))
+* Added ability to write to BigQuery via Avro file loads (Python)([BEAM-8841](https://issues.apache.org/jira/browse/BEAM-8841))
### Breaking Changes
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 74c2293..322311c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -720,7 +720,7 @@ class _CustomBigQuerySource(BoundedSource):
job_ref = bq.perform_extract_job([self.gcs_location],
job_id,
self.table_reference,
- bigquery_tools.ExportFileFormat.JSON,
+ bigquery_tools.FileFormat.JSON,
include_header=False)
bq.wait_for_bq_job(job_ref)
metadata_list = FileSystems.match([self.gcs_location])[0].metadata_list
@@ -1239,7 +1239,8 @@ class WriteToBigQuery(PTransform):
table_side_inputs=None,
schema_side_inputs=None,
triggering_frequency=None,
- validate=True):
+ validate=True,
+ temp_file_format=None):
"""Initialize a WriteToBigQuery transform.
Args:
@@ -1272,8 +1273,9 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
(mode will always be set to ``'NULLABLE'``).
If a callable, then it should receive a destination (in the form of
a TableReference or a string, and return a str, dict or TableSchema.
- One may also pass ``SCHEMA_AUTODETECT`` here, and BigQuery will try to
- infer the schema for the files that are being loaded.
+ One may also pass ``SCHEMA_AUTODETECT`` here when using JSON-based
+ file loads, and BigQuery will try to infer the schema for the files
+ that are being loaded.
create_disposition (BigQueryDisposition): A string describing what
happens if the table does not exist. Possible values are:
@@ -1338,6 +1340,12 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
about BigQuery quotas.
validate: Indicates whether to perform validation checks on
inputs. This parameter is primarily used for testing.
+ temp_file_format: The format to use for file loads into BigQuery. The
+ options are NEWLINE_DELIMITED_JSON or AVRO, with AVRO being used
+ by default. For advantages and limitations of the two formats, see
+ https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro
+ and
+ https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json.
"""
self.table_reference = bigquery_tools.parse_table_reference(
table, dataset, project)
@@ -1348,7 +1356,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
if schema == SCHEMA_AUTODETECT:
self.schema = schema
else:
- self.schema = WriteToBigQuery.get_dict_table_schema(schema)
+ self.schema = bigquery_tools.get_dict_table_schema(schema)
self.batch_size = batch_size
self.kms_key = kms_key
self.test_client = test_client
@@ -1361,87 +1369,18 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
self.triggering_frequency = triggering_frequency
self.insert_retry_strategy = insert_retry_strategy
self._validate = validate
+ self._temp_file_format = temp_file_format or bigquery_tools.FileFormat.AVRO
self.additional_bq_parameters = additional_bq_parameters or {}
self.table_side_inputs = table_side_inputs or ()
self.schema_side_inputs = schema_side_inputs or ()
- @staticmethod
- def get_table_schema_from_string(schema):
- """Transform the string table schema into a
- :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
-bigquery_v2_messages.TableSchema` instance.
-
- Args:
- schema (str): The sting schema to be used if the BigQuery table to write
- has to be created.
-
- Returns:
- ~apache_beam.io.gcp.internal.clients.bigquery.\
-bigquery_v2_messages.TableSchema:
- The schema to be used if the BigQuery table to write has to be created
- but in the :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
-bigquery_v2_messages.TableSchema` format.
- """
- table_schema = bigquery.TableSchema()
- schema_list = [s.strip() for s in schema.split(',')]
- for field_and_type in schema_list:
- field_name, field_type = field_and_type.split(':')
- field_schema = bigquery.TableFieldSchema()
- field_schema.name = field_name
- field_schema.type = field_type
- field_schema.mode = 'NULLABLE'
- table_schema.fields.append(field_schema)
- return table_schema
-
- @staticmethod
- def table_schema_to_dict(table_schema):
- """Create a dictionary representation of table schema for serialization
- """
- def get_table_field(field):
- """Create a dictionary representation of a table field
- """
- result = {}
- result['name'] = field.name
- result['type'] = field.type
- result['mode'] = getattr(field, 'mode', 'NULLABLE')
- if hasattr(field, 'description') and field.description is not None:
- result['description'] = field.description
- if hasattr(field, 'fields') and field.fields:
- result['fields'] = [get_table_field(f) for f in field.fields]
- return result
-
- if not isinstance(table_schema, bigquery.TableSchema):
- raise ValueError("Table schema must be of the type bigquery.TableSchema")
- schema = {'fields': []}
- for field in table_schema.fields:
- schema['fields'].append(get_table_field(field))
- return schema
-
- @staticmethod
- def get_dict_table_schema(schema):
- """Transform the table schema into a dictionary instance.
-
- Args:
- schema (~apache_beam.io.gcp.internal.clients.bigquery.\
-bigquery_v2_messages.TableSchema):
- The schema to be used if the BigQuery table to write has to be created.
- This can either be a dict or string or in the TableSchema format.
-
- Returns:
- Dict[str, Any]: The schema to be used if the BigQuery table to write has
- to be created but in the dictionary format.
- """
- if (isinstance(schema, (dict, vp.ValueProvider)) or callable(schema) or
- schema is None):
- return schema
- elif isinstance(schema, (str, unicode)):
- table_schema = WriteToBigQuery.get_table_schema_from_string(schema)
- return WriteToBigQuery.table_schema_to_dict(table_schema)
- elif isinstance(schema, bigquery.TableSchema):
- return WriteToBigQuery.table_schema_to_dict(schema)
- else:
- raise TypeError('Unexpected schema argument: %s.' % schema)
+ # Dict/schema methods were moved to bigquery_tools, but keep references
+ # here for backward compatibility.
+ get_table_schema_from_string = \
+ staticmethod(bigquery_tools.get_table_schema_from_string)
+ table_schema_to_dict = staticmethod(bigquery_tools.table_schema_to_dict)
+ get_dict_table_schema = staticmethod(bigquery_tools.get_dict_table_schema)
def _compute_method(self, experiments, is_streaming_pipeline):
# If the new BQ sink is not activated for experiment flags, then we use
@@ -1469,13 +1408,12 @@ bigquery_v2_messages.TableSchema):
method_to_use = self._compute_method(experiments, is_streaming_pipeline)
- if (method_to_use == WriteToBigQuery.Method.STREAMING_INSERTS and
- self.schema == SCHEMA_AUTODETECT):
- raise ValueError(
- 'Schema auto-detection is not supported for streaming '
- 'inserts into BigQuery. Only for File Loads.')
-
if method_to_use == WriteToBigQuery.Method.STREAMING_INSERTS:
+ if self.schema == SCHEMA_AUTODETECT:
+ raise ValueError(
+ 'Schema auto-detection is not supported for streaming '
+ 'inserts into BigQuery. Only for File Loads.')
+
if self.triggering_frequency:
raise ValueError(
'triggering_frequency can only be used with '
@@ -1496,13 +1434,26 @@ bigquery_v2_messages.TableSchema):
return {BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS]}
else:
+ if self._temp_file_format == bigquery_tools.FileFormat.AVRO:
+ if self.schema == SCHEMA_AUTODETECT:
+ raise ValueError(
+ 'Schema auto-detection is not supported when using Avro based '
+ 'file loads into BigQuery. Please specify a schema or set '
+ 'temp_file_format="NEWLINE_DELIMITED_JSON"')
+ if self.schema is None:
+ raise ValueError(
+ 'A schema must be provided when writing to BigQuery using '
+ 'Avro based file loads')
+
from apache_beam.io.gcp import bigquery_file_loads
+
return pcoll | bigquery_file_loads.BigQueryBatchFileLoads(
destination=self.table_reference,
schema=self.schema,
create_disposition=self.create_disposition,
write_disposition=self.write_disposition,
triggering_frequency=self.triggering_frequency,
+ temp_file_format=self._temp_file_format,
max_file_size=self.max_file_size,
max_files_per_bundle=self.max_files_per_bundle,
custom_gcs_temp_location=self.custom_gcs_temp_location,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py
new file mode 100644
index 0000000..aa11d10
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py
@@ -0,0 +1,132 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tools used tool work with Avro files in the context of BigQuery.
+
+Classes, constants and functions in this file are experimental and have no
+backwards compatibility guarantees.
+
+NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+
+BIG_QUERY_TO_AVRO_TYPES = {
+ "RECORD": "record",
+ "STRING": "string",
+ "BOOLEAN": "boolean",
+ "BYTES": "bytes",
+ "FLOAT": "double",
+ "INTEGER": "long",
+ "TIME": {
+ "type": "long",
+ "logicalType": "time-micros",
+ },
+ "TIMESTAMP": {
+ "type": "long",
+ "logicalType": "timestamp-micros",
+ },
+ "DATE": {
+ "type": "int",
+ "logicalType": "date",
+ },
+ "DATETIME": "string",
+ "NUMERIC": {
+ "type": "bytes",
+ "logicalType": "decimal",
+ # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#numeric-type
+ "precision": 38,
+ "scale": 9,
+ },
+ "GEOGRAPHY": "string",
+}
+
+
+def get_record_schema_from_dict_table_schema(
+ schema_name, table_schema, namespace="apache_beam.io.gcp.bigquery"):
+ # type: (Text, Dict[Text, Any], Text) -> Dict[Text, Any]
+
+ """Convert a table schema into an Avro schema.
+
+ Args:
+ schema_name (Text): The name of the record.
+ table_schema (Dict[Text, Any]): A BigQuery table schema in dict form.
+ namespace (Text): The namespace of the Avro schema.
+
+ Returns:
+ Dict[Text, Any]: The schema as an Avro RecordSchema.
+ """
+ avro_fields = [
+ table_field_to_avro_field(field, ".".join((namespace, schema_name)))
+ for field in table_schema["fields"]
+ ]
+
+ return {
+ "type": "record",
+ "name": schema_name,
+ "fields": avro_fields,
+ "doc": "Translated Avro Schema for {}".format(schema_name),
+ "namespace": namespace,
+ }
+
+
+def table_field_to_avro_field(table_field, namespace):
+ # type: (Dict[Text, Any]) -> Dict[Text, Any]
+
+ """Convert a BigQuery field to an avro field.
+
+ Args:
+ table_field (Dict[Text, Any]): A BigQuery field in dict form.
+
+ Returns:
+ Dict[Text, Any]: An equivalent Avro field in dict form.
+ """
+ assert "type" in table_field, \
+ "Unable to get type for table field {}".format(table_field)
+ assert table_field["type"] in BIG_QUERY_TO_AVRO_TYPES, \
+ "Unable to map BigQuery field type {} to avro type".format(
+ table_field["type"])
+
+ avro_type = BIG_QUERY_TO_AVRO_TYPES[table_field["type"]]
+
+ if avro_type == "record":
+ element_type = get_record_schema_from_dict_table_schema(
+ table_field["name"],
+ table_field,
+ namespace=".".join((namespace, table_field["name"])))
+ else:
+ element_type = avro_type
+
+ field_mode = table_field.get("mode", "NULLABLE")
+
+ if field_mode in (None, "NULLABLE"):
+ field_type = ["null", element_type]
+ elif field_mode == "REQUIRED":
+ field_type = element_type
+ elif field_mode == "REPEATED":
+ field_type = {"type": "array", "items": element_type}
+ else:
+ raise ValueError("Unkown BigQuery field mode: {}".format(field_mode))
+
+ avro_field = {"type": field_type, "name": table_field["name"]}
+
+ doc = table_field.get("description")
+ if doc:
+ avro_field["doc"] = doc
+
+ return avro_field
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_avro_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_avro_tools_test.py
new file mode 100644
index 0000000..bfb1de1
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigquery_avro_tools_test.py
@@ -0,0 +1,181 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+from __future__ import division
+
+import json
+import logging
+import unittest
+
+import avro.schema
+import fastavro
+
+from apache_beam.io.gcp import bigquery_avro_tools
+from apache_beam.io.gcp import bigquery_tools
+from apache_beam.io.gcp.bigquery_test import HttpError
+from apache_beam.io.gcp.internal.clients import bigquery
+
+
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+class TestBigQueryToAvroSchema(unittest.TestCase):
+ def test_convert_bigquery_schema_to_avro_schema(self):
+ subfields = [
+ bigquery.TableFieldSchema(
+ name="species", type="STRING", mode="NULLABLE"),
+ ]
+
+ fields = [
+ bigquery.TableFieldSchema(
+ name="number", type="INTEGER", mode="REQUIRED"),
+ bigquery.TableFieldSchema(
+ name="species", type="STRING", mode="NULLABLE"),
+ bigquery.TableFieldSchema(
+ name="quality", type="FLOAT"), # default to NULLABLE
+ bigquery.TableFieldSchema(
+ name="quantity", type="INTEGER"), # default to NULLABLE
+ bigquery.TableFieldSchema(
+ name="birthday", type="TIMESTAMP", mode="NULLABLE"),
+ bigquery.TableFieldSchema(
+ name="birthdayMoney", type="NUMERIC", mode="NULLABLE"),
+ bigquery.TableFieldSchema(
+ name="flighted", type="BOOLEAN", mode="NULLABLE"),
+ bigquery.TableFieldSchema(
+ name="sound", type="BYTES", mode="NULLABLE"),
+ bigquery.TableFieldSchema(
+ name="anniversaryDate", type="DATE", mode="NULLABLE"),
+ bigquery.TableFieldSchema(
+ name="anniversaryDatetime", type="DATETIME", mode="NULLABLE"),
+ bigquery.TableFieldSchema(
+ name="anniversaryTime", type="TIME", mode="NULLABLE"),
+ bigquery.TableFieldSchema(
+ name="scion", type="RECORD", mode="NULLABLE", fields=subfields),
+ bigquery.TableFieldSchema(
+ name="associates", type="RECORD", mode="REPEATED", fields=subfields),
+ bigquery.TableFieldSchema(
+ name="geoPositions", type="GEOGRAPHY", mode="NULLABLE"),
+ ]
+
+ table_schema = bigquery.TableSchema(fields=fields)
+ avro_schema = bigquery_avro_tools.get_record_schema_from_dict_table_schema(
+ "root", bigquery_tools.get_dict_table_schema(table_schema))
+
+ # Test that schema can be parsed correctly by fastavro
+ fastavro.parse_schema(avro_schema)
+
+ # Test that schema can be parsed correctly by avro
+ parsed_schema = avro.schema.parse(json.dumps(avro_schema))
+ # Avro RecordSchema provides field_map in py3 and fields_dict in py2
+ field_map = getattr(parsed_schema, "field_map", None) or \
+ getattr(parsed_schema, "fields_dict", None)
+
+ self.assertEqual(
+ field_map["number"].type, avro.schema.parse(json.dumps("long")))
+ self.assertEqual(
+ field_map["species"].type,
+ avro.schema.parse(json.dumps(["null", "string"])))
+ self.assertEqual(
+ field_map["quality"].type,
+ avro.schema.parse(json.dumps(["null", "double"])))
+ self.assertEqual(
+ field_map["quantity"].type,
+ avro.schema.parse(json.dumps(["null", "long"])))
+ self.assertEqual(
+ field_map["birthday"].type,
+ avro.schema.parse(
+ json.dumps(
+ ["null", {
+ "type": "long", "logicalType": "timestamp-micros"
+ }])))
+ self.assertEqual(
+ field_map["birthdayMoney"].type,
+ avro.schema.parse(
+ json.dumps([
+ "null",
+ {
+ "type": "bytes",
+ "logicalType": "decimal",
+ "precision": 38,
+ "scale": 9
+ }
+ ])))
+ self.assertEqual(
+ field_map["flighted"].type,
+ avro.schema.parse(json.dumps(["null", "boolean"])))
+ self.assertEqual(
+ field_map["sound"].type,
+ avro.schema.parse(json.dumps(["null", "bytes"])))
+ self.assertEqual(
+ field_map["anniversaryDate"].type,
+ avro.schema.parse(
+ json.dumps(["null", {
+ "type": "int", "logicalType": "date"
+ }])))
+ self.assertEqual(
+ field_map["anniversaryDatetime"].type,
+ avro.schema.parse(json.dumps(["null", "string"])))
+ self.assertEqual(
+ field_map["anniversaryTime"].type,
+ avro.schema.parse(
+ json.dumps(["null", {
+ "type": "long", "logicalType": "time-micros"
+ }])))
+ self.assertEqual(
+ field_map["geoPositions"].type,
+ avro.schema.parse(json.dumps(["null", "string"])))
+
+ self.assertEqual(
+ field_map["scion"].type,
+ avro.schema.parse(
+ json.dumps([
+ "null",
+ {
+ "type": "record",
+ "name": "scion",
+ "fields": [
+ {
+ "type": ["null", "string"],
+ "name": "species",
+ },
+ ],
+ "doc": "Translated Avro Schema for scion",
+ "namespace": "apache_beam.io.gcp.bigquery.root.scion",
+ },
+ ])))
+
+ self.assertEqual(
+ field_map["associates"].type,
+ avro.schema.parse(
+ json.dumps({
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "associates",
+ "fields": [
+ {
+ "type": ["null", "string"],
+ "name": "species",
+ },
+ ],
+ "doc": "Translated Avro Schema for associates",
+ "namespace": "apache_beam.io.gcp.bigquery.root.associates",
+ }
+ })))
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 04a4928..097f42d 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -99,7 +99,12 @@ def file_prefix_generator(
return _generate_file_prefix
-def _make_new_file_writer(file_prefix, destination):
+def _make_new_file_writer(
+ file_prefix,
+ destination,
+ file_format,
+ schema=None,
+ schema_side_inputs=tuple()):
destination = bigquery_tools.get_hashable_destination(destination)
# Windows does not allow : on filenames. Replacing with underscore.
@@ -115,7 +120,23 @@ def _make_new_file_writer(file_prefix, destination):
file_name = str(uuid.uuid4())
file_path = fs.FileSystems.join(file_prefix, destination, file_name)
- return file_path, fs.FileSystems.create(file_path, 'application/text')
+ if file_format == bigquery_tools.FileFormat.AVRO:
+ if callable(schema):
+ schema = schema(destination, *schema_side_inputs)
+ elif isinstance(schema, vp.ValueProvider):
+ schema = schema.get()
+
+ writer = bigquery_tools.AvroRowWriter(
+ fs.FileSystems.create(file_path, "application/avro"), schema)
+ elif file_format == bigquery_tools.FileFormat.JSON:
+ writer = bigquery_tools.JsonRowWriter(
+ fs.FileSystems.create(file_path, "application/text"))
+ else:
+ raise ValueError((
+ 'Only AVRO and JSON are supported as intermediate formats for '
+ 'BigQuery WriteRecordsToFile, got: {}.').format(file_format))
+
+ return file_path, writer
def _bq_uuid(seed=None):
@@ -166,9 +187,10 @@ class WriteRecordsToFile(beam.DoFn):
def __init__(
self,
+ schema,
max_files_per_bundle=_DEFAULT_MAX_WRITERS_PER_BUNDLE,
max_file_size=_DEFAULT_MAX_FILE_SIZE,
- coder=None):
+ file_format=None):
"""Initialize a :class:`WriteRecordsToFile`.
Args:
@@ -179,21 +201,22 @@ class WriteRecordsToFile(beam.DoFn):
an export job.
"""
+ self.schema = schema
self.max_files_per_bundle = max_files_per_bundle
self.max_file_size = max_file_size
- self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
+ self.file_format = file_format or bigquery_tools.FileFormat.AVRO
def display_data(self):
return {
'max_files_per_bundle': self.max_files_per_bundle,
'max_file_size': str(self.max_file_size),
- 'coder': self.coder.__class__.__name__
+ 'file_format': self.file_format,
}
def start_bundle(self):
self._destination_to_file_writer = {}
- def process(self, element, file_prefix):
+ def process(self, element, file_prefix, *schema_side_inputs):
"""Take a tuple with (destination, row) and write to file or spill out.
Destination may be a ``TableReference`` or a string, and row is a
@@ -204,7 +227,11 @@ class WriteRecordsToFile(beam.DoFn):
if destination not in self._destination_to_file_writer:
if len(self._destination_to_file_writer) < self.max_files_per_bundle:
self._destination_to_file_writer[destination] = _make_new_file_writer(
- file_prefix, destination)
+ file_prefix,
+ destination,
+ self.file_format,
+ self.schema,
+ schema_side_inputs)
else:
yield pvalue.TaggedOutput(
WriteRecordsToFile.UNWRITTEN_RECORD_TAG, element)
@@ -213,8 +240,7 @@ class WriteRecordsToFile(beam.DoFn):
(file_path, writer) = self._destination_to_file_writer[destination]
# TODO(pabloem): Is it possible for this to throw exception?
- writer.write(self.coder.encode(row))
- writer.write(b'\n')
+ writer.write(row)
file_size = writer.tell()
if file_size > self.max_file_size:
@@ -246,11 +272,13 @@ class WriteGroupedRecordsToFile(beam.DoFn):
Experimental; no backwards compatibility guarantees.
"""
- def __init__(self, max_file_size=_DEFAULT_MAX_FILE_SIZE, coder=None):
+ def __init__(
+ self, schema, max_file_size=_DEFAULT_MAX_FILE_SIZE, file_format=None):
+ self.schema = schema
self.max_file_size = max_file_size
- self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
+ self.file_format = file_format or bigquery_tools.FileFormat.AVRO
- def process(self, element, file_prefix):
+ def process(self, element, file_prefix, *schema_side_inputs):
destination = element[0]
rows = element[1]
@@ -258,10 +286,14 @@ class WriteGroupedRecordsToFile(beam.DoFn):
for row in rows:
if writer is None:
- (file_path, writer) = _make_new_file_writer(file_prefix, destination)
+ (file_path, writer) = _make_new_file_writer(
+ file_prefix,
+ destination,
+ self.file_format,
+ self.schema,
+ schema_side_inputs)
- writer.write(self.coder.encode(row))
- writer.write(b'\n')
+ writer.write(row)
file_size = writer.tell()
if file_size > self.max_file_size:
@@ -375,11 +407,13 @@ class TriggerLoadJobs(beam.DoFn):
write_disposition=None,
test_client=None,
temporary_tables=False,
- additional_bq_parameters=None):
+ additional_bq_parameters=None,
+ source_format=None):
self.schema = schema
self.test_client = test_client
self.temporary_tables = temporary_tables
self.additional_bq_parameters = additional_bq_parameters or {}
+ self.source_format = source_format
if self.temporary_tables:
# If we are loading into temporary tables, we rely on the default create
# and write dispositions, which mean that a new table will be created.
@@ -464,7 +498,8 @@ class TriggerLoadJobs(beam.DoFn):
schema=schema,
write_disposition=self.write_disposition,
create_disposition=create_disposition,
- additional_load_parameters=additional_parameters)
+ additional_load_parameters=additional_parameters,
+ source_format=self.source_format)
yield (destination, job_reference)
@@ -578,7 +613,7 @@ class BigQueryBatchFileLoads(beam.PTransform):
create_disposition=None,
write_disposition=None,
triggering_frequency=None,
- coder=None,
+ temp_file_format=None,
max_file_size=None,
max_files_per_bundle=None,
max_partition_size=None,
@@ -610,7 +645,7 @@ class BigQueryBatchFileLoads(beam.PTransform):
self.test_client = test_client
self.schema = schema
- self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
+ self._temp_file_format = temp_file_format or bigquery_tools.FileFormat.AVRO
# If we have multiple destinations, then we will have multiple load jobs,
# thus we will need temporary tables for atomicity.
@@ -674,10 +709,12 @@ class BigQueryBatchFileLoads(beam.PTransform):
destination_data_kv_pc
| beam.ParDo(
WriteRecordsToFile(
+ schema=self.schema,
max_files_per_bundle=self.max_files_per_bundle,
max_file_size=self.max_file_size,
- coder=self.coder),
- file_prefix=file_prefix_pcv).with_outputs(
+ file_format=self._temp_file_format),
+ file_prefix_pcv,
+ *self.schema_side_inputs).with_outputs(
WriteRecordsToFile.UNWRITTEN_RECORD_TAG,
WriteRecordsToFile.WRITTEN_FILE_TAG))
@@ -697,8 +734,10 @@ class BigQueryBatchFileLoads(beam.PTransform):
| "GroupShardedRows" >> beam.GroupByKey()
| "DropShardNumber" >> beam.Map(lambda x: (x[0][0], x[1]))
| "WriteGroupedRecordsToFile" >> beam.ParDo(
- WriteGroupedRecordsToFile(coder=self.coder),
- file_prefix=file_prefix_pcv))
+ WriteGroupedRecordsToFile(
+ schema=self.schema, file_format=self._temp_file_format),
+ file_prefix_pcv,
+ *self.schema_side_inputs))
all_destination_file_pairs_pc = (
(destination_files_kv_pc, more_destination_files_kv_pc)
@@ -748,7 +787,8 @@ class BigQueryBatchFileLoads(beam.PTransform):
create_disposition=self.create_disposition,
test_client=self.test_client,
temporary_tables=True,
- additional_bq_parameters=self.additional_bq_parameters),
+ additional_bq_parameters=self.additional_bq_parameters,
+ source_format=self._temp_file_format),
load_job_name_pcv,
*self.schema_side_inputs).with_outputs(
TriggerLoadJobs.TEMP_TABLES, main='main'))
@@ -796,7 +836,8 @@ class BigQueryBatchFileLoads(beam.PTransform):
create_disposition=self.create_disposition,
test_client=self.test_client,
temporary_tables=False,
- additional_bq_parameters=self.additional_bq_parameters),
+ additional_bq_parameters=self.additional_bq_parameters,
+ source_format=self._temp_file_format),
load_job_name_pcv,
*self.schema_side_inputs))
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 7be618d..29fdfa5 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -21,7 +21,6 @@
from __future__ import absolute_import
-import json
import logging
import os
import random
@@ -34,9 +33,10 @@ from hamcrest.core import assert_that as hamcrest_assert
from hamcrest.core.core.allof import all_of
from hamcrest.core.core.is_ import is_
from nose.plugins.attrib import attr
+from parameterized import param
+from parameterized import parameterized
import apache_beam as beam
-from apache_beam import coders
from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
from apache_beam.io.gcp import bigquery_file_loads as bqfl
from apache_beam.io.gcp import bigquery
@@ -57,59 +57,68 @@ from apache_beam.typehints.typehints import Tuple
try:
from apitools.base.py.exceptions import HttpError
except ImportError:
- HttpError = None
+ raise unittest.SkipTest('GCP dependencies are not installed')
_LOGGER = logging.getLogger(__name__)
_DESTINATION_ELEMENT_PAIRS = [
# DESTINATION 1
- ('project1:dataset1.table1', '{"name":"beam", "language":"py"}'),
- ('project1:dataset1.table1', '{"name":"beam", "language":"java"}'),
- ('project1:dataset1.table1', '{"name":"beam", "language":"go"}'),
- ('project1:dataset1.table1', '{"name":"flink", "language":"java"}'),
- ('project1:dataset1.table1', '{"name":"flink", "language":"scala"}'),
+ ('project1:dataset1.table1', {
+ 'name': 'beam', 'language': 'py'
+ }),
+ ('project1:dataset1.table1', {
+ 'name': 'beam', 'language': 'java'
+ }),
+ ('project1:dataset1.table1', {
+ 'name': 'beam', 'language': 'go'
+ }),
+ ('project1:dataset1.table1', {
+ 'name': 'flink', 'language': 'java'
+ }),
+ ('project1:dataset1.table1', {
+ 'name': 'flink', 'language': 'scala'
+ }),
# DESTINATION 3
- ('project1:dataset1.table3', '{"name":"spark", "language":"scala"}'),
+ ('project1:dataset1.table3', {
+ 'name': 'spark', 'language': 'scala'
+ }),
# DESTINATION 1
- ('project1:dataset1.table1', '{"name":"spark", "language":"py"}'),
- ('project1:dataset1.table1', '{"name":"spark", "language":"scala"}'),
+ ('project1:dataset1.table1', {
+ 'name': 'spark', 'language': 'py'
+ }),
+ ('project1:dataset1.table1', {
+ 'name': 'spark', 'language': 'scala'
+ }),
# DESTINATION 2
- ('project1:dataset1.table2', '{"name":"beam", "foundation":"apache"}'),
- ('project1:dataset1.table2', '{"name":"flink", "foundation":"apache"}'),
- ('project1:dataset1.table2', '{"name":"spark", "foundation":"apache"}'),
-]
-
-_NAME_LANGUAGE_ELEMENTS = [
- json.loads(elm[1]) for elm in _DESTINATION_ELEMENT_PAIRS
- if "language" in elm[1]
+ ('project1:dataset1.table2', {
+ 'name': 'beam', 'foundation': 'apache'
+ }),
+ ('project1:dataset1.table2', {
+ 'name': 'flink', 'foundation': 'apache'
+ }),
+ ('project1:dataset1.table2', {
+ 'name': 'spark', 'foundation': 'apache'
+ }),
]
_DISTINCT_DESTINATIONS = list(
set([elm[0] for elm in _DESTINATION_ELEMENT_PAIRS]))
-_ELEMENTS = list([json.loads(elm[1]) for elm in _DESTINATION_ELEMENT_PAIRS])
-
+_ELEMENTS = [elm[1] for elm in _DESTINATION_ELEMENT_PAIRS]
-class CustomRowCoder(coders.Coder):
- """
- Custom row coder that also expects strings as input data when encoding
- """
- def __init__(self):
- self.coder = bigquery_tools.RowAsDictJsonCoder()
+_ELEMENTS_SCHEMA = bigquery.WriteToBigQuery.get_dict_table_schema(
+ bigquery_api.TableSchema(
+ fields=[
+ bigquery_api.TableFieldSchema(
+ name="name", type="STRING", mode="REQUIRED"),
+ bigquery_api.TableFieldSchema(name="language", type="STRING"),
+ bigquery_api.TableFieldSchema(name="foundation", type="STRING"),
+ ]))
- def encode(self, table_row):
- if type(table_row) == str:
- table_row = json.loads(table_row)
- return self.coder.encode(table_row)
- def decode(self, encoded_table_row):
- return self.coder.decode(encoded_table_row)
-
-
-@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
maxDiff = None
@@ -127,10 +136,16 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
checks(output_pcs)
return output_pcs
- def test_files_created(self):
+ @parameterized.expand([
+ param(file_format=bigquery_tools.FileFormat.AVRO),
+ param(file_format=bigquery_tools.FileFormat.JSON),
+ param(file_format=None),
+ ])
+ def test_files_created(self, file_format):
"""Test that the files are created and written."""
- fn = bqfl.WriteRecordsToFile(coder=CustomRowCoder())
+ fn = bqfl.WriteRecordsToFile(
+ schema=_ELEMENTS_SCHEMA, file_format=file_format)
self.tmpdir = self._new_tempdir()
def check_files_created(output_pcs):
@@ -161,7 +176,7 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
file length is very small, so only a couple records fit in each file.
"""
- fn = bqfl.WriteRecordsToFile(max_file_size=50, coder=CustomRowCoder())
+ fn = bqfl.WriteRecordsToFile(schema=_ELEMENTS_SCHEMA, max_file_size=300)
self.tmpdir = self._new_tempdir()
def check_many_files(output_pcs):
@@ -188,7 +203,11 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
self._consume_input(fn, check_many_files)
- def test_records_are_spilled(self):
+ @parameterized.expand([
+ param(file_format=bigquery_tools.FileFormat.AVRO),
+ param(file_format=bigquery_tools.FileFormat.JSON),
+ ])
+ def test_records_are_spilled(self, file_format):
"""Forces records to be written to many files.
For each destination multiple files are necessary, and at most two files
@@ -196,7 +215,10 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
processing.
"""
- fn = bqfl.WriteRecordsToFile(max_files_per_bundle=2, coder=CustomRowCoder())
+ fn = bqfl.WriteRecordsToFile(
+ schema=_ELEMENTS_SCHEMA,
+ max_files_per_bundle=2,
+ file_format=file_format)
self.tmpdir = self._new_tempdir()
def check_many_files(output_pcs):
@@ -231,7 +253,6 @@ class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
self._consume_input(fn, check_many_files)
-@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
def _consume_input(self, fn, input, checks):
if checks is None:
@@ -247,10 +268,16 @@ class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
checks(res)
return res
- def test_files_are_created(self):
+ @parameterized.expand([
+ param(file_format=bigquery_tools.FileFormat.AVRO),
+ param(file_format=bigquery_tools.FileFormat.JSON),
+ param(file_format=None),
+ ])
+ def test_files_are_created(self, file_format):
"""Test that the files are created and written."""
- fn = bqfl.WriteGroupedRecordsToFile(coder=CustomRowCoder())
+ fn = bqfl.WriteGroupedRecordsToFile(
+ schema=_ELEMENTS_SCHEMA, file_format=file_format)
self.tmpdir = self._new_tempdir()
def check_files_created(output_pc):
@@ -279,7 +306,7 @@ class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
file length is very small, so only a couple records fit in each file.
"""
fn = bqfl.WriteGroupedRecordsToFile(
- max_file_size=50, coder=CustomRowCoder())
+ schema=_ELEMENTS_SCHEMA, max_file_size=300)
self.tmpdir = self._new_tempdir()
def check_multiple_files(output_pc):
@@ -302,7 +329,6 @@ class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
self._consume_input(fn, _DESTINATION_ELEMENT_PAIRS, check_multiple_files)
-@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestPartitionFiles(unittest.TestCase):
_ELEMENTS = [(
@@ -375,7 +401,6 @@ class TestPartitionFiles(unittest.TestCase):
label='CheckSinglePartition')
-@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
def test_records_traverse_transform_with_mocks(self):
destination = 'project1:dataset1.table1'
@@ -401,7 +426,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
custom_gcs_temp_location=self._new_tempdir(),
test_client=bq_client,
validate=False,
- coder=CustomRowCoder())
+ temp_file_format=bigquery_tools.FileFormat.JSON)
# Need to test this with the DirectRunner to avoid serializing mocks
with TestPipeline('DirectRunner') as p:
@@ -534,7 +559,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
custom_gcs_temp_location=self._new_tempdir(),
test_client=bq_client,
validate=False,
- coder=CustomRowCoder(),
+ temp_file_format=bigquery_tools.FileFormat.JSON,
max_file_size=45,
max_partition_size=80,
max_files_per_partition=2))
@@ -580,7 +605,6 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
label='CheckCopyJobCount')
-@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class BigQueryFileLoadsIT(unittest.TestCase):
BIG_QUERY_DATASET_ID = 'python_bq_file_loads_'
@@ -771,6 +795,8 @@ class BigQueryFileLoadsIT(unittest.TestCase):
experiments='use_beam_bq_sink')
with self.assertRaises(Exception):
+ # The pipeline below fails because neither a schema nor SCHEMA_AUTODETECT
+ # are specified.
with beam.Pipeline(argv=args) as p:
input = p | beam.Create(_ELEMENTS)
input2 = p | "Broken record" >> beam.Create(['language_broken_record'])
@@ -783,7 +809,8 @@ class BigQueryFileLoadsIT(unittest.TestCase):
(output_table_1 if 'language' in x else output_table_2),
create_disposition=(
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
- write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
+ write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
+ temp_file_format=bigquery_tools.FileFormat.JSON))
hamcrest_assert(p, all_of(*pipeline_verifiers))
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index b716822..b105e5e 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -20,6 +20,7 @@
from __future__ import absolute_import
+import datetime
import decimal
import json
import logging
@@ -35,6 +36,7 @@ import uuid
import future.tests.base # pylint: disable=unused-import
import hamcrest as hc
import mock
+import pytz
from nose.plugins.attrib import attr
import apache_beam as beam
@@ -560,6 +562,28 @@ class TestWriteToBigQuery(unittest.TestCase):
beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
self.assertEqual(expected_dict_schema, dict_schema)
+ def test_schema_autodetect_not_allowed_with_avro_file_loads(self):
+ with TestPipeline(
+ additional_pipeline_args=["--experiments=use_beam_bq_sink"]) as p:
+ pc = p | beam.Impulse()
+
+ with self.assertRaisesRegex(ValueError, '^A schema must be provided'):
+ _ = (
+ pc
+ | beam.io.gcp.bigquery.WriteToBigQuery(
+ "dataset.table",
+ schema=None,
+ temp_file_format=bigquery_tools.FileFormat.AVRO))
+
+ with self.assertRaisesRegex(ValueError,
+ '^Schema auto-detection is not supported'):
+ _ = (
+ pc
+ | beam.io.gcp.bigquery.WriteToBigQuery(
+ "dataset.table",
+ schema=beam.io.gcp.bigquery.SCHEMA_AUTODETECT,
+ temp_file_format=bigquery_tools.FileFormat.AVRO))
+
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class BigQueryStreamingInsertTransformTests(unittest.TestCase):
@@ -816,7 +840,8 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
str, '%s:%s' % (self.project, output_table_2)),
schema=beam.io.gcp.bigquery.SCHEMA_AUTODETECT,
additional_bq_parameters=lambda _: additional_bq_parameters,
- method='FILE_LOADS'))
+ method='FILE_LOADS',
+ temp_file_format=bigquery_tools.FileFormat.JSON))
@attr('IT')
def test_multiple_destinations_transform(self):
@@ -1025,6 +1050,104 @@ class PubSubBigQueryIT(unittest.TestCase):
WriteToBigQuery.Method.FILE_LOADS, triggering_frequency=20)
+class BigQueryFileLoadsIntegrationTests(unittest.TestCase):
+ BIG_QUERY_DATASET_ID = 'python_bq_file_loads_'
+
+ def setUp(self):
+ self.test_pipeline = TestPipeline(is_integration_test=True)
+ self.runner_name = type(self.test_pipeline.runner).__name__
+ self.project = self.test_pipeline.get_option('project')
+
+ self.dataset_id = '%s%s%s' % (
+ self.BIG_QUERY_DATASET_ID,
+ str(int(time.time())),
+ random.randint(0, 10000))
+ self.bigquery_client = bigquery_tools.BigQueryWrapper()
+ self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
+ self.output_table = '%s.output_table' % (self.dataset_id)
+ self.table_ref = bigquery_tools.parse_table_reference(self.output_table)
+ _LOGGER.info(
+ 'Created dataset %s in project %s', self.dataset_id, self.project)
+
+ @attr('IT')
+ def test_avro_file_load(self):
+ # Construct elements such that they can be written via Avro but not via
+ # JSON. See BEAM-8841.
+ elements = [
+ {
+ 'name': u'Negative infinity',
+ 'value': -float('inf'),
+ 'timestamp': datetime.datetime(1970, 1, 1, tzinfo=pytz.utc),
+ },
+ {
+ 'name': u'Not a number',
+ 'value': float('nan'),
+ 'timestamp': datetime.datetime(2930, 12, 9, tzinfo=pytz.utc),
+ },
+ ]
+
+ schema = beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(
+ bigquery.TableSchema(
+ fields=[
+ bigquery.TableFieldSchema(
+ name='name', type='STRING', mode='REQUIRED'),
+ bigquery.TableFieldSchema(
+ name='value', type='FLOAT', mode='REQUIRED'),
+ bigquery.TableFieldSchema(
+ name='timestamp', type='TIMESTAMP', mode='REQUIRED'),
+ ]))
+
+ pipeline_verifiers = [
+ # Some gymnastics here to avoid comparing NaN since NaN is not equal to
+ # anything, including itself.
+ BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT name, value, timestamp FROM {} WHERE value<0".format(
+ self.output_table),
+ data=[(d['name'], d['value'], d['timestamp'])
+ for d in elements[:1]],
+ ),
+ BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT name, timestamp FROM {}".format(self.output_table),
+ data=[(d['name'], d['timestamp']) for d in elements],
+ ),
+ ]
+
+ args = self.test_pipeline.get_full_options_as_args(
+ on_success_matcher=hc.all_of(*pipeline_verifiers),
+ experiments='use_beam_bq_sink',
+ )
+
+ with beam.Pipeline(argv=args) as p:
+ input = p | 'CreateInput' >> beam.Create(elements)
+ schema_pc = p | 'CreateSchema' >> beam.Create([schema])
+
+ _ = (
+ input
+ | 'WriteToBigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
+ table='%s:%s' % (self.project, self.output_table),
+ schema=lambda _,
+ schema: schema,
+ schema_side_inputs=(beam.pvalue.AsSingleton(schema_pc), ),
+ method='FILE_LOADS',
+ temp_file_format=bigquery_tools.FileFormat.AVRO,
+ ))
+
+ def tearDown(self):
+ request = bigquery.BigqueryDatasetsDeleteRequest(
+ projectId=self.project, datasetId=self.dataset_id, deleteContents=True)
+ try:
+ _LOGGER.info(
+ "Deleting dataset %s in project %s", self.dataset_id, self.project)
+ self.bigquery_client.client.datasets.Delete(request)
+ except HttpError:
+ _LOGGER.debug(
+ 'Failed to clean up dataset %s in project %s',
+ self.dataset_id,
+ self.project)
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index f9edfa3..1ae4f7c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -31,6 +31,7 @@ from __future__ import absolute_import
import datetime
import decimal
+import io
import json
import logging
import re
@@ -39,13 +40,17 @@ import time
import uuid
from builtins import object
+import fastavro
from future.utils import iteritems
+from future.utils import raise_with_traceback
+from past.builtins import unicode
from apache_beam import coders
from apache_beam.internal.gcp import auth
from apache_beam.internal.gcp.json_value import from_json_value
from apache_beam.internal.gcp.json_value import to_json_value
from apache_beam.internal.http_client import get_new_http
+from apache_beam.io.gcp import bigquery_avro_tools
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options import value_provider
from apache_beam.options.pipeline_options import GoogleCloudOptions
@@ -69,7 +74,7 @@ MAX_RETRIES = 3
JSON_COMPLIANCE_ERROR = 'NAN, INF and -INF values are not JSON compliant.'
-class ExportFileFormat(object):
+class FileFormat(object):
CSV = 'CSV'
JSON = 'NEWLINE_DELIMITED_JSON'
AVRO = 'AVRO'
@@ -344,7 +349,8 @@ class BigQueryWrapper(object):
schema=None,
write_disposition=None,
create_disposition=None,
- additional_load_parameters=None):
+ additional_load_parameters=None,
+ source_format=None):
additional_load_parameters = additional_load_parameters or {}
job_schema = None if schema == 'SCHEMA_AUTODETECT' else schema
reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
@@ -358,7 +364,8 @@ class BigQueryWrapper(object):
schema=job_schema,
writeDisposition=write_disposition,
createDisposition=create_disposition,
- sourceFormat='NEWLINE_DELIMITED_JSON',
+ sourceFormat=source_format,
+ useAvroLogicalTypes=True,
autodetect=schema == 'SCHEMA_AUTODETECT',
**additional_load_parameters)),
jobReference=reference,
@@ -656,7 +663,8 @@ class BigQueryWrapper(object):
schema=None,
write_disposition=None,
create_disposition=None,
- additional_load_parameters=None):
+ additional_load_parameters=None,
+ source_format=None):
"""Starts a job to load data into BigQuery.
Returns:
@@ -670,7 +678,8 @@ class BigQueryWrapper(object):
schema=schema,
create_disposition=create_disposition,
write_disposition=write_disposition,
- additional_load_parameters=additional_load_parameters)
+ additional_load_parameters=additional_load_parameters,
+ source_format=source_format)
@retry.with_exponential_backoff(
num_retries=MAX_RETRIES,
@@ -1171,6 +1180,104 @@ class RowAsDictJsonCoder(coders.Coder):
return json.loads(encoded_table_row.decode('utf-8'))
+class JsonRowWriter(io.IOBase):
+ """
+ A writer which provides an IOBase-like interface for writing table rows
+ (represented as dicts) as newline-delimited JSON strings.
+ """
+ def __init__(self, file_handle):
+ """Initialize an JsonRowWriter.
+
+ Args:
+ file_handle (io.IOBase): Output stream to write to.
+ """
+ if not file_handle.writable():
+ raise ValueError("Output stream must be writable")
+
+ self._file_handle = file_handle
+ self._coder = RowAsDictJsonCoder()
+
+ def close(self):
+ self._file_handle.close()
+
+ @property
+ def closed(self):
+ return self._file_handle.closed
+
+ def flush(self):
+ self._file_handle.flush()
+
+ def read(self, size=-1):
+ raise io.UnsupportedOperation("JsonRowWriter is not readable")
+
+ def tell(self):
+ return self._file_handle.tell()
+
+ def writable(self):
+ return self._file_handle.writable()
+
+ def write(self, row):
+ return self._file_handle.write(self._coder.encode(row) + b'\n')
+
+
+class AvroRowWriter(io.IOBase):
+ """
+ A writer which provides an IOBase-like interface for writing table rows
+ (represented as dicts) as Avro records.
+ """
+ def __init__(self, file_handle, schema):
+ """Initialize an AvroRowWriter.
+
+ Args:
+ file_handle (io.IOBase): Output stream to write Avro records to.
+ schema (Dict[Text, Any]): BigQuery table schema.
+ """
+ if not file_handle.writable():
+ raise ValueError("Output stream must be writable")
+
+ self._file_handle = file_handle
+ avro_schema = fastavro.parse_schema(
+ get_avro_schema_from_table_schema(schema))
+ self._avro_writer = fastavro.write.Writer(self._file_handle, avro_schema)
+
+ def close(self):
+ if not self._file_handle.closed:
+ self.flush()
+ self._file_handle.close()
+
+ @property
+ def closed(self):
+ return self._file_handle.closed
+
+ def flush(self):
+ if self._file_handle.closed:
+ raise ValueError("flush on closed file")
+
+ self._avro_writer.flush()
+ self._file_handle.flush()
+
+ def read(self, size=-1):
+ raise io.UnsupportedOperation("AvroRowWriter is not readable")
+
+ def tell(self):
+ # Flush the fastavro Writer to the underlying stream, otherwise there isn't
+ # a reliable way to determine how many bytes have been written.
+ self._avro_writer.flush()
+ return self._file_handle.tell()
+
+ def writable(self):
+ return self._file_handle.writable()
+
+ def write(self, row):
+ try:
+ self._avro_writer.write(row)
+ except (TypeError, ValueError) as ex:
+ raise_with_traceback(
+ ex.__class__(
+ "Error writing row to Avro: {}\nSchema: {}\nRow: {}".format(
+ ex, self._avro_writer.schema, row)))
+
+
class RetryStrategy(object):
RETRY_ALWAYS = 'RETRY_ALWAYS'
RETRY_NEVER = 'RETRY_NEVER'
@@ -1221,3 +1328,97 @@ class AppendDestinationsFn(DoFn):
def process(self, element, *side_inputs):
yield (self.destination(element, *side_inputs), element)
+
+
+def get_table_schema_from_string(schema):
+ """Transform the string table schema into a
+ :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema` instance.
+
+ Args:
+ schema (str): The sting schema to be used if the BigQuery table to write
+ has to be created.
+
+ Returns:
+ ~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema:
+ The schema to be used if the BigQuery table to write has to be created
+ but in the :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema` format.
+ """
+ table_schema = bigquery.TableSchema()
+ schema_list = [s.strip() for s in schema.split(',')]
+ for field_and_type in schema_list:
+ field_name, field_type = field_and_type.split(':')
+ field_schema = bigquery.TableFieldSchema()
+ field_schema.name = field_name
+ field_schema.type = field_type
+ field_schema.mode = 'NULLABLE'
+ table_schema.fields.append(field_schema)
+ return table_schema
+
+
+def table_schema_to_dict(table_schema):
+ """Create a dictionary representation of table schema for serialization
+ """
+ def get_table_field(field):
+ """Create a dictionary representation of a table field
+ """
+ result = {}
+ result['name'] = field.name
+ result['type'] = field.type
+ result['mode'] = getattr(field, 'mode', 'NULLABLE')
+ if hasattr(field, 'description') and field.description is not None:
+ result['description'] = field.description
+ if hasattr(field, 'fields') and field.fields:
+ result['fields'] = [get_table_field(f) for f in field.fields]
+ return result
+
+ if not isinstance(table_schema, bigquery.TableSchema):
+ raise ValueError("Table schema must be of the type bigquery.TableSchema")
+ schema = {'fields': []}
+ for field in table_schema.fields:
+ schema['fields'].append(get_table_field(field))
+ return schema
+
+
+def get_dict_table_schema(schema):
+ """Transform the table schema into a dictionary instance.
+
+ Args:
+ schema (str, dict, ~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema):
+ The schema to be used if the BigQuery table to write has to be created.
+ This can either be a dict or string or in the TableSchema format.
+
+ Returns:
+ Dict[str, Any]: The schema to be used if the BigQuery table to write has
+ to be created but in the dictionary format.
+ """
+ if (isinstance(schema, (dict, value_provider.ValueProvider)) or
+ callable(schema) or schema is None):
+ return schema
+ elif isinstance(schema, (str, unicode)):
+ table_schema = get_table_schema_from_string(schema)
+ return table_schema_to_dict(table_schema)
+ elif isinstance(schema, bigquery.TableSchema):
+ return table_schema_to_dict(schema)
+ else:
+ raise TypeError('Unexpected schema argument: %s.' % schema)
+
+
+def get_avro_schema_from_table_schema(schema):
+ """Transform the table schema into an Avro schema.
+
+ Args:
+ schema (str, dict, ~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema):
+ The TableSchema to convert to Avro schema. This can either be a dict or
+ string or in the TableSchema format.
+
+ Returns:
+ Dict[str, Any]: An Avro schema, which can be used by fastavro.
+ """
+ dict_table_schema = get_dict_table_schema(schema)
+ return bigquery_avro_tools.get_record_schema_from_dict_table_schema(
+ "root", dict_table_schema)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index 5f71434..9c874fa 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -21,15 +21,19 @@ from __future__ import absolute_import
import datetime
import decimal
+import io
import json
import logging
+import math
import re
import time
import unittest
+import fastavro
# patches unittest.TestCase to be python3 compatible
import future.tests.base # pylint: disable=unused-import,ungrouped-imports
import mock
+import pytz
from future.utils import iteritems
import apache_beam as beam
@@ -37,6 +41,8 @@ from apache_beam.internal.gcp.json_value import to_json_value
from apache_beam.io.gcp.bigquery import TableRowJsonCoder
from apache_beam.io.gcp.bigquery_test import HttpError
from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
+from apache_beam.io.gcp.bigquery_tools import AvroRowWriter
+from apache_beam.io.gcp.bigquery_tools import JsonRowWriter
from apache_beam.io.gcp.bigquery_tools import RowAsDictJsonCoder
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
from apache_beam.io.gcp.internal.clients import bigquery
@@ -745,6 +751,67 @@ class TestRowAsDictJsonCoder(unittest.TestCase):
self.json_compliance_exception(float('-inf'))
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+class TestJsonRowWriter(unittest.TestCase):
+ def test_write_row(self):
+ rows = [
+ {
+ 'name': 'beam', 'game': 'dream'
+ },
+ {
+ 'name': 'team', 'game': 'cream'
+ },
+ ]
+
+ with io.BytesIO() as buf:
+ # Mock close() so we can access the buffer contents
+ # after JsonRowWriter is closed.
+ with mock.patch.object(buf, 'close') as mock_close:
+ writer = JsonRowWriter(buf)
+ for row in rows:
+ writer.write(row)
+ writer.close()
+
+ mock_close.assert_called_once()
+
+ buf.seek(0)
+ read_rows = [
+ json.loads(row)
+ for row in buf.getvalue().strip().decode('utf-8').split('\n')
+ ]
+
+ self.assertEqual(read_rows, rows)
+
+
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+class TestAvroRowWriter(unittest.TestCase):
+ def test_write_row(self):
+ schema = bigquery.TableSchema(
+ fields=[
+ bigquery.TableFieldSchema(name='stamp', type='TIMESTAMP'),
+ bigquery.TableFieldSchema(
+ name='number', type='FLOAT', mode='REQUIRED'),
+ ])
+ stamp = datetime.datetime(2020, 2, 25, 12, 0, 0, tzinfo=pytz.utc)
+
+ with io.BytesIO() as buf:
+ # Mock close() so we can access the buffer contents
+ # after AvroRowWriter is closed.
+ with mock.patch.object(buf, 'close') as mock_close:
+ writer = AvroRowWriter(buf, schema)
+ writer.write({'stamp': stamp, 'number': float('NaN')})
+ writer.close()
+
+ mock_close.assert_called_once()
+
+ buf.seek(0)
+ records = [r for r in fastavro.reader(buf)]
+
+ self.assertEqual(len(records), 1)
+ self.assertTrue(math.isnan(records[0]['number']))
+ self.assertEqual(records[0]['stamp'], stamp)
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index 14009f7..9ea75ea 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -37,6 +37,7 @@ from nose.plugins.attrib import attr
import apache_beam as beam
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+from apache_beam.io.gcp.bigquery_tools import FileFormat
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
from apache_beam.testing.test_pipeline import TestPipeline
@@ -208,7 +209,8 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
schema=beam.io.gcp.bigquery.SCHEMA_AUTODETECT,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
- write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
+ write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
+ temp_file_format=FileFormat.JSON))
@attr('IT')
def test_big_query_write_new_types(self):
@@ -350,7 +352,8 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
p | 'create' >> beam.Create(input_data)
| 'write' >> beam.io.WriteToBigQuery(
table_id,
- write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
+ write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
+ temp_file_format=FileFormat.JSON))
if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index c248df4..02c96cb 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -21,6 +21,7 @@
from __future__ import absolute_import
+import io
import os
import shutil
from builtins import zip
@@ -139,7 +140,7 @@ class LocalFileSystem(FileSystem):
"""Helper functions to open a file in the provided mode.
"""
compression_type = FileSystem._get_compression_type(path, compression_type)
- raw_file = open(path, mode)
+ raw_file = io.open(path, mode)
if compression_type == CompressionTypes.UNCOMPRESSED:
return raw_file
else:
diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh
index f41b30e..6d6df82 100755
--- a/sdks/python/scripts/generate_pydoc.sh
+++ b/sdks/python/scripts/generate_pydoc.sh
@@ -140,6 +140,7 @@ ignore_identifiers = [
'Iterable',
'List',
'Set',
+ 'Text',
'Tuple',
# Ignore broken built-in type references
diff --git a/sdks/python/scripts/run_integration_test.sh b/sdks/python/scripts/run_integration_test.sh
index cb0ce31..386bb42 100755
--- a/sdks/python/scripts/run_integration_test.sh
+++ b/sdks/python/scripts/run_integration_test.sh
@@ -203,6 +203,7 @@ if [[ -z $PIPELINE_OPTS ]]; then
# See: https://github.com/hamcrest/PyHamcrest/issues/131.
echo "pyhamcrest!=1.10.0,<2.0.0" > postcommit_requirements.txt
echo "mock<3.0.0" >> postcommit_requirements.txt
+ echo "parameterized>=0.7.1,<0.8.0" >> postcommit_requirements.txt
# Options used to run testing pipeline on Cloud Dataflow Service. Also used for
# running on DirectRunner (some options ignored).