You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kamil Wasilewski (Jira)" <ji...@apache.org> on 2020/08/25 14:52:00 UTC

[jira] [Commented] (BEAM-10524) Default decoder for ReadFromBigQuery does not support repeatable fields

    [ https://issues.apache.org/jira/browse/BEAM-10524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17184105#comment-17184105 ] 

Kamil Wasilewski commented on BEAM-10524:
-----------------------------------------

> Is there any particular reason you decided not to support repeated fields?

I don't remember it clearly. Most probably it must have been a mistake. Nevertheless, the bugfix has been merged to master and will be available since 2.25 release.
Thank for reporting the issue!



> Default decoder for ReadFromBigQuery does not support repeatable fields
> -----------------------------------------------------------------------
>
>                 Key: BEAM-10524
>                 URL: https://issues.apache.org/jira/browse/BEAM-10524
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.23.0
>            Reporter: Roman Frigg
>            Assignee: Kamil Wasilewski
>            Priority: P2
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> The code in [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L570]
> handles decoding fields with mode "REPEATABLE" incorrectly. This bug leads to the following stack trace when running a query that returns results with repeatable fields represented as JSON arrays. The corresponding stack trace looks as follows:
> {noformat}
> ...
>  File "apache_beam/runners/common.py", line 1095, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "/Users/roman/src/ml/data-pipelines/etl/venv/lib/python3.6/site-packages/apache_beam/io/concat_source.py", line 89, in read
>     range_tracker.sub_range_tracker(source_ix)):
>   File "/Users/roman/src/ml/data-pipelines/etl/venv/lib/python3.6/site-packages/apache_beam/io/textio.py", line 210, in read_records
>     yield self._coder.decode(record)
>   File "/Users/roman/src/ml/data-pipelines/etl/venv/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 566, in decode
>     return self._decode_with_schema(value, self.fields)
>   File "/Users/roman/src/ml/data-pipelines/etl/venv/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 580, in _decode_with_schema
>     value[field.name], field.fields)
>   File "/Users/roman/src/ml/data-pipelines/etl/venv/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 575, in _decode_with_schema
>     value[field.name] = None
> TypeError: list indices must be integers or slices, not str{noformat}
>  
> The fix could look something like this (untested):
> {code:python}
> def _decode_with_schema(self, value, schema_fields):
>     for field in schema_fields:
>         if field.name not in value:
>             # The field exists in the schema, but it doesn't exist in this row.
>             # It probably means its value was null, as the extract to JSON job
>             # doesn't preserve null fields
>             value[field.name] = None
>             continue
>         if field.type == 'RECORD':
>             if field.mode == 'REPEATED':
>                 value[field.name] = [self._decode_with_schema(val, field.fields)
>                                      for val in value[field.name]]
>             else:
>                 value[field.name] = self._decode_with_schema(value[field.name],
>                                                              field.fields)
>         else:
>             try:
>                 converter = self._converters[field.type]
>             except KeyError:
>                 # No need to do any conversion
>                 continue
>             if field.mode == 'REPEATED':
>                 value[field.name] = map(converter, value[field.name])
>             else:
>                 value[field.name] = converter(value[field.name])
>     return value
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)