You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/30 23:40:44 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #17159: [BEAM-11587] Generate PColl element from TableSchema

TheNeuralBit commented on a change in pull request #17159:
URL: https://github.com/apache/beam/pull/17159#discussion_r839048090



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -2393,6 +2395,40 @@ def expand(self, pcoll):
           'The method to read from BigQuery must be either EXPORT'
           'or DIRECT_READ.')
 
+  def produce_pcoll_with_schema(self, project_id, dataset_id, table_id):

Review comment:
       Since this doesn't actually use `self`, I think it would be a good idea to pull out this logic into a helper function. [bigquery_tools.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery_tools.py) could be a good place, or we might define a new file for it, like `bigquery_schema_tools.py` (it looks like there's precedent for this, see `bigquery_avro_tools.py`). Would you have a preference @pabloem?

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -2393,6 +2395,40 @@ def expand(self, pcoll):
           'The method to read from BigQuery must be either EXPORT'
           'or DIRECT_READ.')
 
+  def produce_pcoll_with_schema(self, project_id, dataset_id, table_id):
+    the_table_schema = beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper(
+    ).get_table(project_id, dataset_id, table_id)
+    the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema(
+        the_table_schema)
+    i = 0
+    dict_of_tuples = []
+    for x in the_schema['fields']:
+      if the_schema['fields'][i]['type'] == 'STRING':
+        typ = str
+      elif the_schema['fields'][i]['type'] == 'INTEGER':
+        typ = np.int64
+      elif the_schema['fields'][i]['type'] == 'FLOAT':
+        typ = np.float64
+      elif the_schema['fields'][i]['type'] == 'NUMERIC':
+        typ = np.float128
+      elif the_schema['fields'][i]['type'] == 'BIGNUMERIC':
+        typ = np.float128

Review comment:
       I think we should hold off defining mappings for these, np.float128 isn't a perfect mapping since it's floating point and these are fixed point types.

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -2393,6 +2395,40 @@ def expand(self, pcoll):
           'The method to read from BigQuery must be either EXPORT'
           'or DIRECT_READ.')
 
+  def produce_pcoll_with_schema(self, project_id, dataset_id, table_id):
+    the_table_schema = beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper(
+    ).get_table(project_id, dataset_id, table_id)
+    the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema(
+        the_table_schema)
+    i = 0
+    dict_of_tuples = []
+    for x in the_schema['fields']:
+      if the_schema['fields'][i]['type'] == 'STRING':
+        typ = str
+      elif the_schema['fields'][i]['type'] == 'INTEGER':
+        typ = np.int64
+      elif the_schema['fields'][i]['type'] == 'FLOAT':
+        typ = np.float64
+      elif the_schema['fields'][i]['type'] == 'NUMERIC':
+        typ = np.float128
+      elif the_schema['fields'][i]['type'] == 'BIGNUMERIC':
+        typ = np.float128
+      elif the_schema['fields'][i]['type'] == 'BOOL':
+        typ = bool
+      elif the_schema['fields'][i]['type'] == 'BYTES':
+        typ = bytes
+      elif the_schema['fields'][i]['type'] == 'TIMESTAMP':
+        typ = beam.utils.timestamp.Timestamp
+      else:
+        raise ValueError(the_schema['fields'][i]['type'])
+      #TODO svetaksundhar@: Map remaining BQ types
+      dict_of_tuples.append((the_schema['fields'][i]['name'], typ))

Review comment:
       I think we also need to look at whether these fields are nullable/required (if nullable we should wrap the type in `Optional[T]`), and whether the fields are repeated (then we should wrap them in `Sequence[T]`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org