You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/06/16 17:55:10 UTC

[beam] branch master updated: Merge pull request #11086 from [BEAM-8910] Make custom BQ source read from Avro

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dfc4bde  Merge pull request #11086 from [BEAM-8910] Make custom BQ source read from Avro
dfc4bde is described below

commit dfc4bde1837a020d723801e39d7177231f49168c
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Tue Jun 16 10:54:45 2020 -0700

    Merge pull request #11086 from [BEAM-8910] Make custom BQ source read from Avro
    
    * Make custom BQ source read from Avro
    
    * Using avro logical types
    
    * Working with test
    
    * Matching appropriately
    
    * Separate test behavior
    
    * Support Backwards Compatible output
    
    * Fix formatter
    
    * Fixup
    
    * Fixup
    
    * Addressing comments
    
    * Addressing comments
    
    * Addressing comments. Documenting. Nits
    
    * Keeping new_types test for JSON
    
    * Fixup
---
 CHANGES.md                                         |  4 +
 .../io/gcp/big_query_query_to_table_it_test.py     | 31 ++++++-
 .../io/gcp/big_query_query_to_table_pipeline.py    |  9 ++
 sdks/python/apache_beam/io/gcp/bigquery.py         | 95 +++++++++++++++-------
 .../apache_beam/io/gcp/bigquery_read_it_test.py    | 29 +++++--
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   | 11 ++-
 6 files changed, 142 insertions(+), 37 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 13a4986..246d385 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -59,6 +59,10 @@
 * Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 * Support for reading from Snowflake added (Java) ([BEAM-9722](https://issues.apache.org/jira/browse/BEAM-9722)).
 * Support for writing to Splunk added (Java) ([BEAM-8596](https://issues.apache.org/jira/browse/BEAM-8596)).
+* A new transform to read from BigQuery has been added: `apache_beam.io.gcp.bigquery.ReadFromBigQuery`. This transform
+  is experimental. It reads data from BigQuery by exporting data to Avro files, and reading those files. It also supports
+  reading data by exporting to JSON files. This has small differences in behavior for Time and Date-related fields. See
+  Pydoc for more information.
 
 ## New Features / Improvements
 
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index 82ac896..1f39928 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -173,7 +173,8 @@ class BigQueryQueryToTableIT(unittest.TestCase):
         'output_schema': DIALECT_OUTPUT_SCHEMA,
         'use_standard_sql': False,
         'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
-        'on_success_matcher': all_of(*pipeline_verifiers)
+        'on_success_matcher': all_of(*pipeline_verifiers),
+        'experiments': 'use_beam_bq_sink',
     }
     options = self.test_pipeline.get_full_options_as_args(**extra_opts)
     big_query_query_to_table_pipeline.run_bq_pipeline(options)
@@ -196,7 +197,8 @@ class BigQueryQueryToTableIT(unittest.TestCase):
         'output_schema': DIALECT_OUTPUT_SCHEMA,
         'use_standard_sql': True,
         'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
-        'on_success_matcher': all_of(*pipeline_verifiers)
+        'on_success_matcher': all_of(*pipeline_verifiers),
+        'experiments': 'use_beam_bq_sink',
     }
     options = self.test_pipeline.get_full_options_as_args(**extra_opts)
     big_query_query_to_table_pipeline.run_bq_pipeline(options)
@@ -254,12 +256,37 @@ class BigQueryQueryToTableIT(unittest.TestCase):
         'output_schema': NEW_TYPES_OUTPUT_SCHEMA,
         'use_standard_sql': False,
         'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
+        'use_json_exports': True,
         'on_success_matcher': all_of(*pipeline_verifiers)
     }
     options = self.test_pipeline.get_full_options_as_args(**extra_opts)
     big_query_query_to_table_pipeline.run_bq_pipeline(options)
 
   @attr('IT')
+  def test_big_query_new_types_avro(self):
+    expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
+    verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table
+    pipeline_verifiers = [
+        PipelineStateMatcher(),
+        BigqueryMatcher(
+            project=self.project,
+            query=verify_query,
+            checksum=expected_checksum)
+    ]
+    self._setup_new_types_env()
+    extra_opts = {
+        'query': NEW_TYPES_QUERY % (self.dataset_id, NEW_TYPES_INPUT_TABLE),
+        'output': self.output_table,
+        'output_schema': NEW_TYPES_OUTPUT_SCHEMA,
+        'use_standard_sql': False,
+        'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
+        'on_success_matcher': all_of(*pipeline_verifiers),
+        'experiments': 'use_beam_bq_sink',
+    }
+    options = self.test_pipeline.get_full_options_as_args(**extra_opts)
+    big_query_query_to_table_pipeline.run_bq_pipeline(options)
+
+  @attr('IT')
   def test_big_query_new_types_native(self):
     expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
     verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
index 200a1c9..50cd584 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
@@ -63,6 +63,11 @@ def run_bq_pipeline(argv=None):
       default=False,
       action='store_true',
       help='Use NativeSources and Sinks.')
+  parser.add_argument(
+      '--use_json_exports',
+      default=False,
+      action='store_true',
+      help='Use JSON as the file format for exports.')
   known_args, pipeline_args = parser.parse_known_args(argv)
 
   table_schema = parse_table_schema_from_json(known_args.output_schema)
@@ -84,6 +89,7 @@ def run_bq_pipeline(argv=None):
         query=known_args.query,
         project=options.view_as(GoogleCloudOptions).project,
         use_standard_sql=known_args.use_standard_sql,
+        use_json_exports=known_args.use_json_exports,
         kms_key=kms_key)
   if known_args.native:
     _ = data | 'write' >> beam.io.Write(
@@ -94,11 +100,14 @@ def run_bq_pipeline(argv=None):
             write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
             kms_key=kms_key))
   else:
+    temp_file_format = (
+        'NEWLINE_DELIMITED_JSON' if known_args.use_json_exports else 'AVRO')
     _ = data | 'write' >> beam.io.WriteToBigQuery(
         known_args.output,
         schema=table_schema,
         create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
         write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
+        temp_file_format=temp_file_format,
         kms_key=kms_key)
 
   result = p.run()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 9f7c0c9..b51fb94 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -45,8 +45,8 @@ call *one* row of the main table and *all* rows of the side table. The runner
 may use some caching techniques to share the side inputs between calls in order
 to avoid excessive reading:::
 
-  main_table = pipeline | 'VeryBig' >> beam.io.Read(beam.io.BigQuerySource()
-  side_table = pipeline | 'NotBig' >> beam.io.Read(beam.io.BigQuerySource()
+  main_table = pipeline | 'VeryBig' >> beam.io.ReadFroBigQuery(...)
+  side_table = pipeline | 'NotBig' >> beam.io.ReadFromBigQuery(...)
   results = (
       main_table
       | 'ProcessData' >> beam.Map(
@@ -58,14 +58,8 @@ as a parameter to the Map transform. AsList signals to the execution framework
 that its input should be made available whole.
 
 The main and side inputs are implemented differently. Reading a BigQuery table
-as main input entails exporting the table to a set of GCS files (currently in
-JSON format) and then processing those files. Reading the same table as a side
-input entails querying the table for all its rows. The coder argument on
-BigQuerySource controls the reading of the lines in the export files (i.e.,
-transform a JSON object into a PCollection element). The coder is not involved
-when the same table is read as a side input since there is no intermediate
-format involved. We get the table rows directly from the BigQuery service with
-a query.
+as main input entails exporting the table to a set of GCS files (in AVRO or in
+JSON format) and then processing those files.
 
 Users may provide a query to read from rather than reading all of a BigQuery
 table. If specified, the result obtained by executing the specified query will
@@ -78,6 +72,12 @@ When creating a BigQuery input transform, users should provide either a query
 or a table. Pipeline construction will fail with a validation error if neither
 or both are specified.
 
+When reading from BigQuery using `BigQuerySource`, bytes are returned as
+base64-encoded bytes. When reading via `ReadFromBigQuery`, bytes are returned
+as bytes without base64 encoding. This is due to the fact that ReadFromBigQuery
+uses Avro expors by default. To get base64-encoded bytes, you can use the flag
+`use_json_exports` to export data as JSON, and receive base64-encoded bytes.
+
 Writing Data to BigQuery
 ========================
 
@@ -225,8 +225,7 @@ The GEOGRAPHY data type works with Well-Known Text (See
 https://en.wikipedia.org/wiki/Well-known_text) format for reading and writing
 to BigQuery.
 BigQuery IO requires values of BYTES datatype to be encoded using base64
-encoding when writing to BigQuery. When bytes are read from BigQuery they are
-returned as base64-encoded bytes.
+encoding when writing to BigQuery.
 """
 
 # pytype: skip-file
@@ -251,6 +250,7 @@ from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam.internal.gcp.json_value import from_json_value
 from apache_beam.internal.gcp.json_value import to_json_value
+from apache_beam.io.avroio import _create_avro_source as create_avro_source
 from apache_beam.io.filesystems import CompressionTypes
 from apache_beam.io.filesystems import FileSystems
 from apache_beam.io.gcp import bigquery_tools
@@ -609,7 +609,8 @@ class _CustomBigQuerySource(BoundedSource):
       use_standard_sql=False,
       flatten_results=True,
       kms_key=None,
-      bigquery_job_labels=None):
+      bigquery_job_labels=None,
+      use_json_exports=False):
     if table is not None and query is not None:
       raise ValueError(
           'Both a BigQuery table and a query were specified.'
@@ -638,14 +639,17 @@ class _CustomBigQuerySource(BoundedSource):
     self.split_result = None
     self.options = pipeline_options
     self.bigquery_job_labels = bigquery_job_labels or {}
+    self.use_json_exports = use_json_exports
 
   def display_data(self):
+    export_format = 'JSON' if self.use_json_exports else 'AVRO'
     return {
         'table': str(self.table_reference),
         'query': str(self.query),
         'project': str(self.project),
         'use_legacy_sql': self.use_legacy_sql,
         'bigquery_job_labels': json.dumps(self.bigquery_job_labels),
+        'export_file_format': export_format,
     }
 
   def estimate_size(self):
@@ -691,6 +695,17 @@ class _CustomBigQuerySource(BoundedSource):
       project = self.project
     return project
 
+  def _create_source(self, path, schema):
+    if not self.use_json_exports:
+      return create_avro_source(path, use_fastavro=True)
+    else:
+      return TextSource(
+          path,
+          min_bundle_size=0,
+          compression_type=CompressionTypes.UNCOMPRESSED,
+          strip_trailing_newlines=True,
+          coder=self.coder(schema))
+
   def split(self, desired_bundle_size, start_position=None, stop_position=None):
     if self.split_result is None:
       bq = bigquery_tools.BigQueryWrapper()
@@ -701,13 +716,10 @@ class _CustomBigQuerySource(BoundedSource):
 
       schema, metadata_list = self._export_files(bq)
       self.split_result = [
-          TextSource(
-              metadata.path,
-              0,
-              CompressionTypes.UNCOMPRESSED,
-              True,
-              self.coder(schema)) for metadata in metadata_list
+          self._create_source(metadata.path, schema)
+          for metadata in metadata_list
       ]
+
       if self.query is not None:
         bq.clean_up_temporary_dataset(self._get_project())
 
@@ -755,13 +767,23 @@ class _CustomBigQuerySource(BoundedSource):
       bigquery.TableSchema instance, a list of FileMetadata instances
     """
     job_id = uuid.uuid4().hex
-    job_ref = bq.perform_extract_job([self.gcs_location],
-                                     job_id,
-                                     self.table_reference,
-                                     bigquery_tools.FileFormat.JSON,
-                                     project=self._get_project(),
-                                     include_header=False,
-                                     job_labels=self.bigquery_job_labels)
+    if self.use_json_exports:
+      job_ref = bq.perform_extract_job([self.gcs_location],
+                                       job_id,
+                                       self.table_reference,
+                                       bigquery_tools.FileFormat.JSON,
+                                       project=self._get_project(),
+                                       job_labels=self.bigquery_job_labels,
+                                       include_header=False)
+    else:
+      job_ref = bq.perform_extract_job([self.gcs_location],
+                                       job_id,
+                                       self.table_reference,
+                                       bigquery_tools.FileFormat.AVRO,
+                                       project=self._get_project(),
+                                       include_header=False,
+                                       job_labels=self.bigquery_job_labels,
+                                       use_avro_logical_types=True)
     bq.wait_for_bq_job(job_ref)
     metadata_list = FileSystems.match([self.gcs_location])[0].metadata_list
 
@@ -1627,7 +1649,8 @@ class ReadFromBigQuery(PTransform):
   """Read data from BigQuery.
 
     This PTransform uses a BigQuery export job to take a snapshot of the table
-    on GCS, and then reads from each produced JSON file.
+    on GCS, and then reads from each produced file. File format is Avro by
+    default.
 
   Args:
     table (str, callable, ValueProvider): The ID of the table, or a callable
@@ -1671,7 +1694,23 @@ class ReadFromBigQuery(PTransform):
       to BigQuery export and query jobs created by this transform. See:
       https://cloud.google.com/bigquery/docs/reference/rest/v2/\
               Job#JobConfiguration
-  """
+    use_json_exports (bool): By default, this transform works by exporting
+      BigQuery data into Avro files, and reading those files. With this
+      parameter, the transform will instead export to JSON files. JSON files
+      are slower to read due to their larger size.
+      When using JSON exports, the BigQuery types for DATE, DATETIME, TIME, and
+      TIMESTAMP will be exported as strings. This behavior is consistent with
+      BigQuerySource.
+      When using Avro exports, these fields will be exported as native Python
+      types (datetime.date, datetime.datetime, datetime.datetime,
+      and datetime.datetime respectively). Avro exports are recommended.
+      To learn more about BigQuery types, and Time-related type
+      representations, see: https://cloud.google.com/bigquery/docs/reference/\
+              standard-sql/data-types
+      To learn more about type conversions between BigQuery and Avro, see:
+      https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro\
+              #avro_conversions
+   """
   def __init__(self, gcs_location=None, validate=False, *args, **kwargs):
     if gcs_location:
       if not isinstance(gcs_location, (str, unicode, ValueProvider)):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index 50ebcdc..ce82b31 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -23,6 +23,7 @@
 from __future__ import absolute_import
 
 import base64
+import datetime
 import logging
 import random
 import time
@@ -70,6 +71,18 @@ def skip(runners):
   return inner
 
 
+def datetime_to_utc(element):
+  for k, v in element.items():
+    if isinstance(v, (datetime.time, datetime.date)):
+      element[k] = str(v)
+    if isinstance(v, datetime.datetime) and v.tzinfo:
+      # For datetime objects, we'll
+      offset = v.utcoffset()
+      utc_dt = (v - offset).strftime('%Y-%m-%d %H:%M:%S.%f UTC')
+      element[k] = utc_dt
+  return element
+
+
 class BigQueryReadIntegrationTests(unittest.TestCase):
   BIG_QUERY_DATASET_ID = 'python_read_table_'
 
@@ -238,11 +251,12 @@ class ReadNewTypesTests(BigQueryReadIntegrationTests):
     cls.bigquery_client.insert_rows(
         cls.project, cls.dataset_id, table_name, table_data)
 
-  def get_expected_data(self):
+  def get_expected_data(self, native=True):
+    byts = b'\xab\xac'
     expected_row = {
         'float': 0.33,
         'numeric': Decimal('10'),
-        'bytes': base64.b64encode(b'\xab\xac'),
+        'bytes': base64.b64encode(byts) if native else byts,
         'date': '3000-12-31',
         'time': '23:59:59',
         'datetime': '2018-12-31T12:44:31',
@@ -265,7 +279,8 @@ class ReadNewTypesTests(BigQueryReadIntegrationTests):
   def test_native_source(self):
     with beam.Pipeline(argv=self.args) as p:
       result = (
-          p | 'read' >> beam.io.Read(
+          p
+          | 'read' >> beam.io.Read(
               beam.io.BigQuerySource(query=self.query, use_standard_sql=True)))
       assert_that(result, equal_to(self.get_expected_data()))
 
@@ -273,12 +288,14 @@ class ReadNewTypesTests(BigQueryReadIntegrationTests):
   def test_iobase_source(self):
     with beam.Pipeline(argv=self.args) as p:
       result = (
-          p | 'read' >> beam.io.ReadFromBigQuery(
+          p
+          | 'read' >> beam.io.ReadFromBigQuery(
               query=self.query,
               use_standard_sql=True,
               project=self.project,
-              bigquery_job_labels={'launcher': 'apache_beam_tests'}))
-      assert_that(result, equal_to(self.get_expected_data()))
+              bigquery_job_labels={'launcher': 'apache_beam_tests'})
+          | beam.Map(datetime_to_utc))
+      assert_that(result, equal_to(self.get_expected_data(native=False)))
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index b036dc0..12c5dc0 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -95,6 +95,12 @@ def default_encoder(obj):
     # on python 3 base64-encoded bytes are decoded to strings
     # before being sent to BigQuery
     return obj.decode('utf-8')
+  elif isinstance(obj, (datetime.date, datetime.time)):
+    return str(obj)
+  elif isinstance(obj, datetime.datetime):
+    return obj.isoformat()
+
+  _LOGGER.error("Unable to serialize %r to JSON", obj)
   raise TypeError(
       "Object of type '%s' is not JSON serializable" % type(obj).__name__)
 
@@ -720,6 +726,7 @@ class BigQueryWrapper(object):
       project=None,
       include_header=True,
       compression=ExportCompression.NONE,
+      use_avro_logical_types=False,
       job_labels=None):
     """Starts a job to export data from BigQuery.
 
@@ -738,6 +745,7 @@ class BigQueryWrapper(object):
                     printHeader=include_header,
                     destinationFormat=destination_format,
                     compression=compression,
+                    useAvroLogicalTypes=use_avro_logical_types,
                 ),
                 labels=_build_job_labels(job_labels),
             ),
@@ -1205,7 +1213,8 @@ class RowAsDictJsonCoder(coders.Coder):
       return json.dumps(
           table_row, allow_nan=False, default=default_encoder).encode('utf-8')
     except ValueError as e:
-      raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
+      raise ValueError(
+          '%s. %s. Row: %r' % (e, JSON_COMPLIANCE_ERROR, table_row))
 
   def decode(self, encoded_table_row):
     return json.loads(encoded_table_row.decode('utf-8'))