You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Noah Goodrich (Jira)" <ji...@apache.org> on 2019/10/21 21:48:00 UTC
[jira] [Commented] (BEAM-8452) TriggerLoadJobs.process in
bigquery_file_loads schema is type str
[ https://issues.apache.org/jira/browse/BEAM-8452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16956470#comment-16956470 ]
Noah Goodrich commented on BEAM-8452:
-------------------------------------
I would like to propose the following as a fix in bigquery_file_loads.TriggerLoadJobs.process:
{code:java}
# Each load job is assumed to have files respecting these constraints:
# 1. Total size of all files < 15 TB (Max size for load jobs)
# 2. Total no. of files in a single load job < 10,000
# This assumption means that there will always be a single load job
# triggered for each partition of files.
destination = element[0]
files = element[1]
if callable(self.schema):
schema = self.schema(destination, *schema_side_inputs)
elif isinstance(self.schema, vp.ValueProvider):
schema = self.schema.get()
else:
schema = self.schema
if isinstance(schema, (str, unicode)):
schema = bigquery_tools.parse_table_schema_from_json(schema)
elif isinstance(schema, dict):
schema = bigquery_tools.parse_table_schema_from_json(json.dumps(schema))
{code}
> TriggerLoadJobs.process in bigquery_file_loads schema is type str
> -----------------------------------------------------------------
>
> Key: BEAM-8452
> URL: https://issues.apache.org/jira/browse/BEAM-8452
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.15.0, 2.16.0
> Reporter: Noah Goodrich
> Assignee: Noah Goodrich
> Priority: Major
>
> I've found a first issue with the BigQueryFileLoads Transform and the type of the schema parameter.
> {code:java}
> Triggering job beam_load_2019_10_11_140829_19_157670e4d458f0ff578fbe971a91b30a_1570802915 to load data to BigQuery table <TableReference
> datasetId: 'pyr_monat_dev'
> projectId: 'icentris-ml-dev'
> tableId: 'tree_user_types'>.Schema: {"fields": [{"name": "id", "type": "INTEGER", "mode": "required"}, {"name": "description", "type": "STRING", "mode": "nullable"}]}. Additional parameters: {}
> Retry with exponential backoff: waiting for 4.875033410381894 seconds before retrying _insert_load_job because we caught exception: apitools.base.protorpclite.messages.ValidationError: Expected type <clas
> s 'apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_messages.TableSchema'> for field schema, found {"fields": [{"name": "id", "type": "INTEGER", "mode": "required"}, {"name": "description", "type"
> : "STRING", "mode": "nullable"}]} (type <class 'str'>)
> Traceback for above exception (most recent call last):
> File "/opt/conda/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 206, in wrapper
> return fun(*args, **kwargs)
> File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 344, in _insert_load_job
> **additional_load_parameters
> File "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 791, in __init__
> setattr(self, name, value)
> File "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 973, in __setattr__
> object.__setattr__(self, name, value)
> File "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 1652, in __set__
> super(MessageField, self).__set__(message_instance, value)
> File "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 1293, in __set__
> value = self.validate(value)
> File "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 1400, in validate
> return self.__validate(value, self.validate_element)
> File "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 1358, in __validate
> return validate_element(value)
> File "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 1340, in validate_element
> (self.type, name, value, type(value)))
>
> {code}
>
> The triggering code looks like this:
>
> options.view_as(DebugOptions).experiments = ['use_beam_bq_sink']
> # Save main session state so pickled functions and classes
> # defined in __main__ can be unpickled
> options.view_as(SetupOptions).save_main_session = True
> custom_options = options.view_as(LoadSqlToBqOptions)
> with beam.Pipeline(options=options) as p:
> (p
> | "Initializing with empty collection" >> beam.Create([1])
> | "Reading records from CloudSql" >> beam.ParDo(ReadFromRelationalDBFn(
> username=custom_options.user,
> password=custom_options.password,
> database=custom_options.database,
> table=custom_options.table,
> key_field=custom_options.key_field,
> batch_size=custom_options.batch_size))
> | "Converting Row Object for BigQuery" >> beam.ParDo(BuildForBigQueryFn(custom_options.bq_schema))
> | "Writing to BigQuery" >> beam.io.WriteToBigQuery(
> table=custom_options.bq_table,
> schema=custom_options.bq_schema,
> write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
> create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)