You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/06/16 17:55:10 UTC
[beam] branch master updated: Merge pull request #11086 from
[BEAM-8910] Make custom BQ source read from Avro
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new dfc4bde Merge pull request #11086 from [BEAM-8910] Make custom BQ source read from Avro
dfc4bde is described below
commit dfc4bde1837a020d723801e39d7177231f49168c
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Tue Jun 16 10:54:45 2020 -0700
Merge pull request #11086 from [BEAM-8910] Make custom BQ source read from Avro
* Make custom BQ source read from Avro
* Using avro logical types
* Working with test
* Matching appropriately
* Separate test behavior
* Support Backwards Compatible output
* Fix formatter
* Fixup
* Fixup
* Addressing comments
* Addressing comments
* Addressing comments. Documenting. Nits
* Keeping new_types test for JSON
* Fixup
---
CHANGES.md | 4 +
.../io/gcp/big_query_query_to_table_it_test.py | 31 ++++++-
.../io/gcp/big_query_query_to_table_pipeline.py | 9 ++
sdks/python/apache_beam/io/gcp/bigquery.py | 95 +++++++++++++++-------
.../apache_beam/io/gcp/bigquery_read_it_test.py | 29 +++++--
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 11 ++-
6 files changed, 142 insertions(+), 37 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 13a4986..246d385 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -59,6 +59,10 @@
* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Support for reading from Snowflake added (Java) ([BEAM-9722](https://issues.apache.org/jira/browse/BEAM-9722)).
* Support for writing to Splunk added (Java) ([BEAM-8596](https://issues.apache.org/jira/browse/BEAM-8596)).
+* A new transform to read from BigQuery has been added: `apache_beam.io.gcp.bigquery.ReadFromBigQuery`. This transform
+ is experimental. It reads data from BigQuery by exporting data to Avro files, and reading those files. It also supports
+ reading data by exporting to JSON files. This has small differences in behavior for Time and Date-related fields. See
+ Pydoc for more information.
## New Features / Improvements
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index 82ac896..1f39928 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -173,7 +173,8 @@ class BigQueryQueryToTableIT(unittest.TestCase):
'output_schema': DIALECT_OUTPUT_SCHEMA,
'use_standard_sql': False,
'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
- 'on_success_matcher': all_of(*pipeline_verifiers)
+ 'on_success_matcher': all_of(*pipeline_verifiers),
+ 'experiments': 'use_beam_bq_sink',
}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)
@@ -196,7 +197,8 @@ class BigQueryQueryToTableIT(unittest.TestCase):
'output_schema': DIALECT_OUTPUT_SCHEMA,
'use_standard_sql': True,
'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
- 'on_success_matcher': all_of(*pipeline_verifiers)
+ 'on_success_matcher': all_of(*pipeline_verifiers),
+ 'experiments': 'use_beam_bq_sink',
}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)
@@ -254,12 +256,37 @@ class BigQueryQueryToTableIT(unittest.TestCase):
'output_schema': NEW_TYPES_OUTPUT_SCHEMA,
'use_standard_sql': False,
'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
+ 'use_json_exports': True,
'on_success_matcher': all_of(*pipeline_verifiers)
}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)
@attr('IT')
+ def test_big_query_new_types_avro(self):
+ expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
+ verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table
+ pipeline_verifiers = [
+ PipelineStateMatcher(),
+ BigqueryMatcher(
+ project=self.project,
+ query=verify_query,
+ checksum=expected_checksum)
+ ]
+ self._setup_new_types_env()
+ extra_opts = {
+ 'query': NEW_TYPES_QUERY % (self.dataset_id, NEW_TYPES_INPUT_TABLE),
+ 'output': self.output_table,
+ 'output_schema': NEW_TYPES_OUTPUT_SCHEMA,
+ 'use_standard_sql': False,
+ 'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
+ 'on_success_matcher': all_of(*pipeline_verifiers),
+ 'experiments': 'use_beam_bq_sink',
+ }
+ options = self.test_pipeline.get_full_options_as_args(**extra_opts)
+ big_query_query_to_table_pipeline.run_bq_pipeline(options)
+
+ @attr('IT')
def test_big_query_new_types_native(self):
expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
index 200a1c9..50cd584 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
@@ -63,6 +63,11 @@ def run_bq_pipeline(argv=None):
default=False,
action='store_true',
help='Use NativeSources and Sinks.')
+ parser.add_argument(
+ '--use_json_exports',
+ default=False,
+ action='store_true',
+ help='Use JSON as the file format for exports.')
known_args, pipeline_args = parser.parse_known_args(argv)
table_schema = parse_table_schema_from_json(known_args.output_schema)
@@ -84,6 +89,7 @@ def run_bq_pipeline(argv=None):
query=known_args.query,
project=options.view_as(GoogleCloudOptions).project,
use_standard_sql=known_args.use_standard_sql,
+ use_json_exports=known_args.use_json_exports,
kms_key=kms_key)
if known_args.native:
_ = data | 'write' >> beam.io.Write(
@@ -94,11 +100,14 @@ def run_bq_pipeline(argv=None):
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
kms_key=kms_key))
else:
+ temp_file_format = (
+ 'NEWLINE_DELIMITED_JSON' if known_args.use_json_exports else 'AVRO')
_ = data | 'write' >> beam.io.WriteToBigQuery(
known_args.output,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
+ temp_file_format=temp_file_format,
kms_key=kms_key)
result = p.run()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 9f7c0c9..b51fb94 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -45,8 +45,8 @@ call *one* row of the main table and *all* rows of the side table. The runner
may use some caching techniques to share the side inputs between calls in order
to avoid excessive reading:::
- main_table = pipeline | 'VeryBig' >> beam.io.Read(beam.io.BigQuerySource()
- side_table = pipeline | 'NotBig' >> beam.io.Read(beam.io.BigQuerySource()
+ main_table = pipeline | 'VeryBig' >> beam.io.ReadFroBigQuery(...)
+ side_table = pipeline | 'NotBig' >> beam.io.ReadFromBigQuery(...)
results = (
main_table
| 'ProcessData' >> beam.Map(
@@ -58,14 +58,8 @@ as a parameter to the Map transform. AsList signals to the execution framework
that its input should be made available whole.
The main and side inputs are implemented differently. Reading a BigQuery table
-as main input entails exporting the table to a set of GCS files (currently in
-JSON format) and then processing those files. Reading the same table as a side
-input entails querying the table for all its rows. The coder argument on
-BigQuerySource controls the reading of the lines in the export files (i.e.,
-transform a JSON object into a PCollection element). The coder is not involved
-when the same table is read as a side input since there is no intermediate
-format involved. We get the table rows directly from the BigQuery service with
-a query.
+as main input entails exporting the table to a set of GCS files (in AVRO or in
+JSON format) and then processing those files.
Users may provide a query to read from rather than reading all of a BigQuery
table. If specified, the result obtained by executing the specified query will
@@ -78,6 +72,12 @@ When creating a BigQuery input transform, users should provide either a query
or a table. Pipeline construction will fail with a validation error if neither
or both are specified.
+When reading from BigQuery using `BigQuerySource`, bytes are returned as
+base64-encoded bytes. When reading via `ReadFromBigQuery`, bytes are returned
+as bytes without base64 encoding. This is due to the fact that ReadFromBigQuery
+uses Avro expors by default. To get base64-encoded bytes, you can use the flag
+`use_json_exports` to export data as JSON, and receive base64-encoded bytes.
+
Writing Data to BigQuery
========================
@@ -225,8 +225,7 @@ The GEOGRAPHY data type works with Well-Known Text (See
https://en.wikipedia.org/wiki/Well-known_text) format for reading and writing
to BigQuery.
BigQuery IO requires values of BYTES datatype to be encoded using base64
-encoding when writing to BigQuery. When bytes are read from BigQuery they are
-returned as base64-encoded bytes.
+encoding when writing to BigQuery.
"""
# pytype: skip-file
@@ -251,6 +250,7 @@ from apache_beam import coders
from apache_beam import pvalue
from apache_beam.internal.gcp.json_value import from_json_value
from apache_beam.internal.gcp.json_value import to_json_value
+from apache_beam.io.avroio import _create_avro_source as create_avro_source
from apache_beam.io.filesystems import CompressionTypes
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.gcp import bigquery_tools
@@ -609,7 +609,8 @@ class _CustomBigQuerySource(BoundedSource):
use_standard_sql=False,
flatten_results=True,
kms_key=None,
- bigquery_job_labels=None):
+ bigquery_job_labels=None,
+ use_json_exports=False):
if table is not None and query is not None:
raise ValueError(
'Both a BigQuery table and a query were specified.'
@@ -638,14 +639,17 @@ class _CustomBigQuerySource(BoundedSource):
self.split_result = None
self.options = pipeline_options
self.bigquery_job_labels = bigquery_job_labels or {}
+ self.use_json_exports = use_json_exports
def display_data(self):
+ export_format = 'JSON' if self.use_json_exports else 'AVRO'
return {
'table': str(self.table_reference),
'query': str(self.query),
'project': str(self.project),
'use_legacy_sql': self.use_legacy_sql,
'bigquery_job_labels': json.dumps(self.bigquery_job_labels),
+ 'export_file_format': export_format,
}
def estimate_size(self):
@@ -691,6 +695,17 @@ class _CustomBigQuerySource(BoundedSource):
project = self.project
return project
+ def _create_source(self, path, schema):
+ if not self.use_json_exports:
+ return create_avro_source(path, use_fastavro=True)
+ else:
+ return TextSource(
+ path,
+ min_bundle_size=0,
+ compression_type=CompressionTypes.UNCOMPRESSED,
+ strip_trailing_newlines=True,
+ coder=self.coder(schema))
+
def split(self, desired_bundle_size, start_position=None, stop_position=None):
if self.split_result is None:
bq = bigquery_tools.BigQueryWrapper()
@@ -701,13 +716,10 @@ class _CustomBigQuerySource(BoundedSource):
schema, metadata_list = self._export_files(bq)
self.split_result = [
- TextSource(
- metadata.path,
- 0,
- CompressionTypes.UNCOMPRESSED,
- True,
- self.coder(schema)) for metadata in metadata_list
+ self._create_source(metadata.path, schema)
+ for metadata in metadata_list
]
+
if self.query is not None:
bq.clean_up_temporary_dataset(self._get_project())
@@ -755,13 +767,23 @@ class _CustomBigQuerySource(BoundedSource):
bigquery.TableSchema instance, a list of FileMetadata instances
"""
job_id = uuid.uuid4().hex
- job_ref = bq.perform_extract_job([self.gcs_location],
- job_id,
- self.table_reference,
- bigquery_tools.FileFormat.JSON,
- project=self._get_project(),
- include_header=False,
- job_labels=self.bigquery_job_labels)
+ if self.use_json_exports:
+ job_ref = bq.perform_extract_job([self.gcs_location],
+ job_id,
+ self.table_reference,
+ bigquery_tools.FileFormat.JSON,
+ project=self._get_project(),
+ job_labels=self.bigquery_job_labels,
+ include_header=False)
+ else:
+ job_ref = bq.perform_extract_job([self.gcs_location],
+ job_id,
+ self.table_reference,
+ bigquery_tools.FileFormat.AVRO,
+ project=self._get_project(),
+ include_header=False,
+ job_labels=self.bigquery_job_labels,
+ use_avro_logical_types=True)
bq.wait_for_bq_job(job_ref)
metadata_list = FileSystems.match([self.gcs_location])[0].metadata_list
@@ -1627,7 +1649,8 @@ class ReadFromBigQuery(PTransform):
"""Read data from BigQuery.
This PTransform uses a BigQuery export job to take a snapshot of the table
- on GCS, and then reads from each produced JSON file.
+ on GCS, and then reads from each produced file. File format is Avro by
+ default.
Args:
table (str, callable, ValueProvider): The ID of the table, or a callable
@@ -1671,7 +1694,23 @@ class ReadFromBigQuery(PTransform):
to BigQuery export and query jobs created by this transform. See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/\
Job#JobConfiguration
- """
+ use_json_exports (bool): By default, this transform works by exporting
+ BigQuery data into Avro files, and reading those files. With this
+ parameter, the transform will instead export to JSON files. JSON files
+ are slower to read due to their larger size.
+ When using JSON exports, the BigQuery types for DATE, DATETIME, TIME, and
+ TIMESTAMP will be exported as strings. This behavior is consistent with
+ BigQuerySource.
+ When using Avro exports, these fields will be exported as native Python
+ types (datetime.date, datetime.datetime, datetime.datetime,
+ and datetime.datetime respectively). Avro exports are recommended.
+ To learn more about BigQuery types, and Time-related type
+ representations, see: https://cloud.google.com/bigquery/docs/reference/\
+ standard-sql/data-types
+ To learn more about type conversions between BigQuery and Avro, see:
+ https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro\
+ #avro_conversions
+ """
def __init__(self, gcs_location=None, validate=False, *args, **kwargs):
if gcs_location:
if not isinstance(gcs_location, (str, unicode, ValueProvider)):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index 50ebcdc..ce82b31 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -23,6 +23,7 @@
from __future__ import absolute_import
import base64
+import datetime
import logging
import random
import time
@@ -70,6 +71,18 @@ def skip(runners):
return inner
+def datetime_to_utc(element):
+ for k, v in element.items():
+ if isinstance(v, (datetime.time, datetime.date)):
+ element[k] = str(v)
+ if isinstance(v, datetime.datetime) and v.tzinfo:
+ # For datetime objects, we'll
+ offset = v.utcoffset()
+ utc_dt = (v - offset).strftime('%Y-%m-%d %H:%M:%S.%f UTC')
+ element[k] = utc_dt
+ return element
+
+
class BigQueryReadIntegrationTests(unittest.TestCase):
BIG_QUERY_DATASET_ID = 'python_read_table_'
@@ -238,11 +251,12 @@ class ReadNewTypesTests(BigQueryReadIntegrationTests):
cls.bigquery_client.insert_rows(
cls.project, cls.dataset_id, table_name, table_data)
- def get_expected_data(self):
+ def get_expected_data(self, native=True):
+ byts = b'\xab\xac'
expected_row = {
'float': 0.33,
'numeric': Decimal('10'),
- 'bytes': base64.b64encode(b'\xab\xac'),
+ 'bytes': base64.b64encode(byts) if native else byts,
'date': '3000-12-31',
'time': '23:59:59',
'datetime': '2018-12-31T12:44:31',
@@ -265,7 +279,8 @@ class ReadNewTypesTests(BigQueryReadIntegrationTests):
def test_native_source(self):
with beam.Pipeline(argv=self.args) as p:
result = (
- p | 'read' >> beam.io.Read(
+ p
+ | 'read' >> beam.io.Read(
beam.io.BigQuerySource(query=self.query, use_standard_sql=True)))
assert_that(result, equal_to(self.get_expected_data()))
@@ -273,12 +288,14 @@ class ReadNewTypesTests(BigQueryReadIntegrationTests):
def test_iobase_source(self):
with beam.Pipeline(argv=self.args) as p:
result = (
- p | 'read' >> beam.io.ReadFromBigQuery(
+ p
+ | 'read' >> beam.io.ReadFromBigQuery(
query=self.query,
use_standard_sql=True,
project=self.project,
- bigquery_job_labels={'launcher': 'apache_beam_tests'}))
- assert_that(result, equal_to(self.get_expected_data()))
+ bigquery_job_labels={'launcher': 'apache_beam_tests'})
+ | beam.Map(datetime_to_utc))
+ assert_that(result, equal_to(self.get_expected_data(native=False)))
if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index b036dc0..12c5dc0 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -95,6 +95,12 @@ def default_encoder(obj):
# on python 3 base64-encoded bytes are decoded to strings
# before being sent to BigQuery
return obj.decode('utf-8')
+ elif isinstance(obj, (datetime.date, datetime.time)):
+ return str(obj)
+ elif isinstance(obj, datetime.datetime):
+ return obj.isoformat()
+
+ _LOGGER.error("Unable to serialize %r to JSON", obj)
raise TypeError(
"Object of type '%s' is not JSON serializable" % type(obj).__name__)
@@ -720,6 +726,7 @@ class BigQueryWrapper(object):
project=None,
include_header=True,
compression=ExportCompression.NONE,
+ use_avro_logical_types=False,
job_labels=None):
"""Starts a job to export data from BigQuery.
@@ -738,6 +745,7 @@ class BigQueryWrapper(object):
printHeader=include_header,
destinationFormat=destination_format,
compression=compression,
+ useAvroLogicalTypes=use_avro_logical_types,
),
labels=_build_job_labels(job_labels),
),
@@ -1205,7 +1213,8 @@ class RowAsDictJsonCoder(coders.Coder):
return json.dumps(
table_row, allow_nan=False, default=default_encoder).encode('utf-8')
except ValueError as e:
- raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
+ raise ValueError(
+ '%s. %s. Row: %r' % (e, JSON_COMPLIANCE_ERROR, table_row))
def decode(self, encoded_table_row):
return json.loads(encoded_table_row.decode('utf-8'))