You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Evan Galpin (Jira)" <ji...@apache.org> on 2022/03/04 16:23:00 UTC
[jira] [Commented] (BEAM-13487) WriteToBigQuery Dynamic table destinations returns wrong tableId
[ https://issues.apache.org/jira/browse/BEAM-13487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17501421#comment-17501421 ]
Evan Galpin commented on BEAM-13487:
------------------------------------
[~aremedi] I traced the code and I believe that the issue could be that the lambda provided for `table` param should return a TableReference object rather than a string. That note [does appear in the docstring|https://github.com/apache/beam/blob/2aa4da02bc49be72b45e8327c5233c0e4938e59d/sdks/python/apache_beam/io/gcp/bigquery.py#L1942-L1944] but I missed it initially as well.
TL;DR modify `compute_table_name` to return a TableReference and this issue should hopefully clear up.
`table` is used in the `WriteToBigQuery` constructor in 2 places:
1. [https://github.com/apache/beam/blob/2aa4da02bc49be72b45e8327c5233c0e4938e59d/sdks/python/apache_beam/io/gcp/bigquery.py#L2077]
2. [https://github.com/apache/beam/blob/2aa4da02bc49be72b45e8327c5233c0e4938e59d/sdks/python/apache_beam/io/gcp/bigquery.py#L2080-L2081]
2 above involves passing the table to `bigquery_tools.parse_table_reference` ([source for bigquery_tools.parse_table_reference|https://github.com/apache/beam/blob/2aa4da02bc49be72b45e8327c5233c0e4938e59d/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L212-L248]). The result is then stored as `self.table_reference`. `self.table_reference` is passed to ex [_StreamToBigQuery|https://github.com/apache/beam/blob/2aa4da02bc49be72b45e8327c5233c0e4938e59d/sdks/python/apache_beam/io/gcp/bigquery.py#L2159] which is later passed to [bigquery_tools.AppendDestinationsFn|https://github.com/apache/beam/blob/2aa4da02bc49be72b45e8327c5233c0e4938e59d/sdks/python/apache_beam/io/gcp/bigquery.py#L1854] before finally being called.
> WriteToBigQuery Dynamic table destinations returns wrong tableId
> ----------------------------------------------------------------
>
> Key: BEAM-13487
> URL: https://issues.apache.org/jira/browse/BEAM-13487
> Project: Beam
> Issue Type: Bug
> Components: io-py-gcp
> Affects Versions: 2.34.0
> Reporter: Augusto
> Priority: P1
>
> I am trying to write to bigquery to different table destinations and I would like to create the tables dynamically if they don't exist already.
> {code:java}
> bigquery_rows | "Writing to Bigquery" >> WriteToBigQuery(lambda e: compute_table_name(e),
> schema=compute_table_schema,
> additional_bq_parameters=additional_bq_parameters,
> write_disposition=BigQueryDisposition.WRITE_APPEND,
> create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
> ){code}
> The function compute_table_name is quite simple actually, I am just trying to get it to work.
> {code:java}
> def compute_table_name(element):
> if element['table'] == 'table_id':
> del element['table']
> return "project_id:dataset.table_id" {code}
> The schema is detected correctly and the table IS created and populated with records. The problem is, the table ID I get is something along the lines of:
> {code:java}
> datasetId: 'dataset'
> projectId: 'project_id'
> tableId: 'beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP... {code}
> I have also tried returning a bigquery.TableReference object in my compute_table_name function to no avail.
> {code:java}
> def compute_table_name(element):
> if element['table'] == 'Radio':
> del element['table']
> return TableReference(
> datasetId = "dataset_id",
> projectId = "project_id",
> tableId = "table_id"
> ) {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)