You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/10/26 15:53:12 UTC
[beam] branch release-2.43.0 updated: [cherry-pick][release-2.43.0] Avoid Circular imports related to bigquery_schema_tools (#23784)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch release-2.43.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.43.0 by this push:
new edb250b258b [cherry-pick][release-2.43.0] Avoid Circular imports related to bigquery_schema_tools (#23784)
edb250b258b is described below
commit edb250b258bb80eb4549189f824cc529c46ac7f6
Author: Svetak Sundhar <sv...@google.com>
AuthorDate: Wed Oct 26 11:53:04 2022 -0400
[cherry-pick][release-2.43.0] Avoid Circular imports related to bigquery_schema_tools (#23784)
* Avoid Circular imports related to bigquery_schema_tools (#23731)
* why rowcoder?
* registering datetime.datetime as schema type
* registering datetime.datetime as schema type
* registered type in schemas.py
* registered type in schemas.py
* convert Timestamp to datetime.datetime, which will then get converted into apache_beam.utils.timestamp.Timestamp
* experiment with converting to datetime.datetime
* Timestamp to datetime.datetime mapping
* Timestamp to datetime.datetime mapping
* Timestamp to datetime.datetime mapping
* np fix
* apache_beam_utils.timestamp.Timestamp obj
* apache_beam_utils.timestamp.Timestamp obj
* fixed tests
* Timestamp conversion
* Timestamp conversion with lint
* Timestamp conversion with lint
* fix
* fix
* avoid circular import
* avoid circular import
* lint fixes
* Avoid Circular imports related to bigquery_schema_tools (#23731)
* why rowcoder?
* registering datetime.datetime as schema type
* registering datetime.datetime as schema type
* registered type in schemas.py
* registered type in schemas.py
* convert Timestamp to datetime.datetime, which will then get converted into apache_beam.utils.timestamp.Timestamp
* experiment with converting to datetime.datetime
* Timestamp to datetime.datetime mapping
* Timestamp to datetime.datetime mapping
* Timestamp to datetime.datetime mapping
* np fix
* apache_beam_utils.timestamp.Timestamp obj
* apache_beam_utils.timestamp.Timestamp obj
* fixed tests
* Timestamp conversion
* Timestamp conversion with lint
* Timestamp conversion with lint
* fix
* fix
* avoid circular import
* avoid circular import
* lint fixes
* Avoid Circular imports related to bigquery_schema_tools (#23731)
* why rowcoder?
* registering datetime.datetime as schema type
* registering datetime.datetime as schema type
* registered type in schemas.py
* registered type in schemas.py
* convert Timestamp to datetime.datetime, which will then get converted into apache_beam.utils.timestamp.Timestamp
* experiment with converting to datetime.datetime
* Timestamp to datetime.datetime mapping
* Timestamp to datetime.datetime mapping
* Timestamp to datetime.datetime mapping
* np fix
* apache_beam_utils.timestamp.Timestamp obj
* apache_beam_utils.timestamp.Timestamp obj
* fixed tests
* Timestamp conversion
* Timestamp conversion with lint
* Timestamp conversion with lint
* fix
* fix
* avoid circular import
* avoid circular import
* lint fixes
---
sdks/python/apache_beam/io/gcp/bigquery.py | 5 +++--
sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py | 17 ++++++++++-------
2 files changed, 13 insertions(+), 9 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index bad20f69243..7233326ce0c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -369,6 +369,7 @@ from apache_beam.io import range_trackers
from apache_beam.io.avroio import _create_avro_source as create_avro_source
from apache_beam.io.filesystems import CompressionTypes
from apache_beam.io.filesystems import FileSystems
+from apache_beam.io.gcp import bigquery_schema_tools
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
from apache_beam.io.gcp.bigquery_read_internal import _BigQueryReadSplit
@@ -2471,9 +2472,9 @@ class ReadFromBigQuery(PTransform):
raise TypeError(
'%s: table must be of type string'
'; got a callable instead' % self.__class__.__name__)
- return output_pcollection | beam.io.gcp.bigquery_schema_tools.\
+ return output_pcollection | bigquery_schema_tools.\
convert_to_usertype(
- beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
+ bigquery_tools.BigQueryWrapper().get_table(
project_id=table_details.projectId,
dataset_id=table_details.datasetId,
table_id=table_details.tableId).schema)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
index e78f7bd5a7f..4c25aa62e0b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
@@ -28,9 +28,13 @@ from typing import Sequence
import numpy as np
import apache_beam as beam
+import apache_beam.io.gcp.bigquery_tools
+import apache_beam.typehints.schemas
+import apache_beam.utils.proto_utils
import apache_beam.utils.timestamp
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.portability.api import schema_pb2
+from apache_beam.transforms import DoFn
# BigQuery types as listed in
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
@@ -91,13 +95,11 @@ def bq_field_to_type(field, mode):
def convert_to_usertype(table_schema):
- usertype = beam.io.gcp.bigquery_schema_tools. \
- generate_user_type_from_bq_schema(table_schema)
- return beam.ParDo(
- beam.io.gcp.bigquery_schema_tools.BeamSchemaConversionDoFn(usertype))
+ usertype = generate_user_type_from_bq_schema(table_schema)
+ return beam.ParDo(BeamSchemaConversionDoFn(usertype))
-class BeamSchemaConversionDoFn(beam.DoFn):
+class BeamSchemaConversionDoFn(DoFn):
def __init__(self, pcoll_val_ctor):
self._pcoll_val_ctor = pcoll_val_ctor
@@ -113,8 +115,9 @@ class BeamSchemaConversionDoFn(beam.DoFn):
@classmethod
def _from_serialized_schema(cls, schema_str):
return cls(
- beam.typehints.schemas.named_tuple_from_schema(
- beam.utils.proto_utils.parse_Bytes(schema_str, schema_pb2.Schema)))
+ apache_beam.typehints.schemas.named_tuple_from_schema(
+ apache_beam.utils.proto_utils.parse_Bytes(
+ schema_str, schema_pb2.Schema)))
def __reduce__(self):
# when pickling, use bytes representation of the schema.