You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/10/26 15:53:12 UTC

[beam] branch release-2.43.0 updated: [cherry-pick][release-2.43.0] Avoid Circular imports related to bigquery_schema_tools (#23784)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch release-2.43.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.43.0 by this push:
     new edb250b258b [cherry-pick][release-2.43.0] Avoid Circular imports related to bigquery_schema_tools (#23784)
edb250b258b is described below

commit edb250b258bb80eb4549189f824cc529c46ac7f6
Author: Svetak Sundhar <sv...@google.com>
AuthorDate: Wed Oct 26 11:53:04 2022 -0400

    [cherry-pick][release-2.43.0] Avoid Circular imports related to bigquery_schema_tools (#23784)
    
    * Avoid Circular imports related to bigquery_schema_tools (#23731)
    
    * why rowcoder?
    
    * registering datetime.datetime as schema type
    
    * registering datetime.datetime as schema type
    
    * registered type in schemas.py
    
    * registered type in schemas.py
    
    * convert Timestamp to datetime.datetime, which will then get converted into apache_beam.utils.timestamp.Timestamp
    
    * experiment with converting to datetime.datetime
    
    * Timestamp to datetime.datetime mapping
    
    * Timestamp to datetime.datetime mapping
    
    * Timestamp to datetime.datetime mapping
    
    * np fix
    
    * apache_beam_utils.timestamp.Timestamp obj
    
    * apache_beam_utils.timestamp.Timestamp obj
    
    * fixed tests
    
    * Timestamp conversion
    
    * Timestamp conversion with lint
    
    * Timestamp conversion with lint
    
    * fix
    
    * fix
    
    * avoid circular import
    
    * avoid circular import
    
    * lint fixes
    
    * Avoid Circular imports related to bigquery_schema_tools (#23731)
    
    * why rowcoder?
    
    * registering datetime.datetime as schema type
    
    * registering datetime.datetime as schema type
    
    * registered type in schemas.py
    
    * registered type in schemas.py
    
    * convert Timestamp to datetime.datetime, which will then get converted into apache_beam.utils.timestamp.Timestamp
    
    * experiment with converting to datetime.datetime
    
    * Timestamp to datetime.datetime mapping
    
    * Timestamp to datetime.datetime mapping
    
    * Timestamp to datetime.datetime mapping
    
    * np fix
    
    * apache_beam_utils.timestamp.Timestamp obj
    
    * apache_beam_utils.timestamp.Timestamp obj
    
    * fixed tests
    
    * Timestamp conversion
    
    * Timestamp conversion with lint
    
    * Timestamp conversion with lint
    
    * fix
    
    * fix
    
    * avoid circular import
    
    * avoid circular import
    
    * lint fixes
    
    * Avoid Circular imports related to bigquery_schema_tools (#23731)
    
    * why rowcoder?
    
    * registering datetime.datetime as schema type
    
    * registering datetime.datetime as schema type
    
    * registered type in schemas.py
    
    * registered type in schemas.py
    
    * convert Timestamp to datetime.datetime, which will then get converted into apache_beam.utils.timestamp.Timestamp
    
    * experiment with converting to datetime.datetime
    
    * Timestamp to datetime.datetime mapping
    
    * Timestamp to datetime.datetime mapping
    
    * Timestamp to datetime.datetime mapping
    
    * np fix
    
    * apache_beam_utils.timestamp.Timestamp obj
    
    * apache_beam_utils.timestamp.Timestamp obj
    
    * fixed tests
    
    * Timestamp conversion
    
    * Timestamp conversion with lint
    
    * Timestamp conversion with lint
    
    * fix
    
    * fix
    
    * avoid circular import
    
    * avoid circular import
    
    * lint fixes
---
 sdks/python/apache_beam/io/gcp/bigquery.py              |  5 +++--
 sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py | 17 ++++++++++-------
 2 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index bad20f69243..7233326ce0c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -369,6 +369,7 @@ from apache_beam.io import range_trackers
 from apache_beam.io.avroio import _create_avro_source as create_avro_source
 from apache_beam.io.filesystems import CompressionTypes
 from apache_beam.io.filesystems import FileSystems
+from apache_beam.io.gcp import bigquery_schema_tools
 from apache_beam.io.gcp import bigquery_tools
 from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
 from apache_beam.io.gcp.bigquery_read_internal import _BigQueryReadSplit
@@ -2471,9 +2472,9 @@ class ReadFromBigQuery(PTransform):
         raise TypeError(
             '%s: table must be of type string'
             '; got a callable instead' % self.__class__.__name__)
-      return output_pcollection | beam.io.gcp.bigquery_schema_tools.\
+      return output_pcollection | bigquery_schema_tools.\
             convert_to_usertype(
-            beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
+            bigquery_tools.BigQueryWrapper().get_table(
                 project_id=table_details.projectId,
                 dataset_id=table_details.datasetId,
                 table_id=table_details.tableId).schema)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
index e78f7bd5a7f..4c25aa62e0b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
@@ -28,9 +28,13 @@ from typing import Sequence
 import numpy as np
 
 import apache_beam as beam
+import apache_beam.io.gcp.bigquery_tools
+import apache_beam.typehints.schemas
+import apache_beam.utils.proto_utils
 import apache_beam.utils.timestamp
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.portability.api import schema_pb2
+from apache_beam.transforms import DoFn
 
 # BigQuery types as listed in
 # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
@@ -91,13 +95,11 @@ def bq_field_to_type(field, mode):
 
 
 def convert_to_usertype(table_schema):
-  usertype = beam.io.gcp.bigquery_schema_tools. \
-        generate_user_type_from_bq_schema(table_schema)
-  return beam.ParDo(
-      beam.io.gcp.bigquery_schema_tools.BeamSchemaConversionDoFn(usertype))
+  usertype = generate_user_type_from_bq_schema(table_schema)
+  return beam.ParDo(BeamSchemaConversionDoFn(usertype))
 
 
-class BeamSchemaConversionDoFn(beam.DoFn):
+class BeamSchemaConversionDoFn(DoFn):
   def __init__(self, pcoll_val_ctor):
     self._pcoll_val_ctor = pcoll_val_ctor
 
@@ -113,8 +115,9 @@ class BeamSchemaConversionDoFn(beam.DoFn):
   @classmethod
   def _from_serialized_schema(cls, schema_str):
     return cls(
-        beam.typehints.schemas.named_tuple_from_schema(
-            beam.utils.proto_utils.parse_Bytes(schema_str, schema_pb2.Schema)))
+        apache_beam.typehints.schemas.named_tuple_from_schema(
+            apache_beam.utils.proto_utils.parse_Bytes(
+                schema_str, schema_pb2.Schema)))
 
   def __reduce__(self):
     # when pickling, use bytes representation of the schema.