You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2021/10/13 17:25:01 UTC

[jira] [Updated] (BEAM-12659) WriteToBigQuery in BundleBasedDirectRunner fails when method is FILE_LOADS

     [ https://issues.apache.org/jira/browse/BEAM-12659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Beam JIRA Bot updated BEAM-12659:
---------------------------------
    Labels: GCP newbie stale-assigned  (was: GCP newbie)

> WriteToBigQuery in BundleBasedDirectRunner fails when method is FILE_LOADS
> --------------------------------------------------------------------------
>
>                 Key: BEAM-12659
>                 URL: https://issues.apache.org/jira/browse/BEAM-12659
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp, runner-py-direct
>    Affects Versions: 2.31.0
>         Environment: Ubuntu 20.04 (running inside Docker, python:3.8-slim)
>            Reporter: Miles McCain
>            Assignee: Pablo Estrada
>            Priority: P2
>              Labels: GCP, newbie, stale-assigned
>
> `WriteToBigQuery` fails when using the `FILE_LOADS` method in the `BundleBasedDirectRunner`.
> The issue appears to be in `wait_for_bq_job`, where the function expects `job_reference` to be an actual JobReference instance and not a string. However, the `WaitForBQJobs` DoFn appears to be [passing a string]([https://github.com/apache/beam/blob/5a029fd97d663e19a9bcd6bff61648bccbd7f95b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L730]) as the argument. I believe this is during the copy step, and I'm not calling this code directly (so unfortunately I can't just pass a TableReference instance myself).
> Here is a traceback:
>  
> {code:java}
> request_worker_1      | ERROR:root:Traceback (most recent call last):
> request_worker_1      |   File "/app/main.py", line 209, in process_message
> request_worker_1      |     construct_and_run_pipeline(request)
> request_worker_1      |   File "/app/main.py", line 190, in construct_and_run_pipeline
> request_worker_1      |     return result.wait_until_finish()
> request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py", line 588, in wait_until_finish
> request_worker_1      |     self._executor.await_completion()
> request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line 433, in await_completion
> request_worker_1      |     self._executor.await_completion()
> request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line 482, in await_completion
> request_worker_1      |     raise t(v).with_traceback(tb)
> request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line 371, in call
> request_worker_1      |     self.attempt_call(
> request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line 414, in attempt_call
> request_worker_1      |     evaluator.process_element(value)
> request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 880, in process_element
> request_worker_1      |     self.runner.process(element)
> request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 1225, in process
> request_worker_1      |     self._reraise_augmented(exn)
> request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 1306, in _reraise_augmented
> request_worker_1      |     raise new_exn.with_traceback(tb)
> request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 1223, in process
> request_worker_1      |     return self.do_fn_invoker.invoke_process(windowed_value)
> request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 752, in invoke_process
> request_worker_1      |     self._invoke_process_per_window(
> request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 877, in _invoke_process_per_window
> request_worker_1      |     self.process_method(*args_for_process),
> request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 730, in process
> request_worker_1      |     self.bq_wrapper.wait_for_bq_job(ref, sleep_duration_sec=10, max_retries=0)
> request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 562, in wait_for_bq_job
> request_worker_1      |     job_reference.projectId, job_reference.jobId, job_reference.location)
> request_worker_1      | AttributeError: 'str' object has no attribute 'projectId' [while running 'write tweets to bigquery/Write/BigQueryBatchFileLoads/WaitForTempTableLoadJobs']
> {code}
>  
> Here is the `WriteToBigQuery` step that is failing (note that the callable passed for `table` returns a TableReference instance):
> {code:java}
> WriteToBigQuery(
>      table=lambda row: bigquery_tools.parse_table_reference(row["table_name"]),
>      create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
>      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
>      ignore_insert_ids=True,
>      method="FILE_LOADS", # using STREAMING_INSERTS 'fixes' the issue
>      batch_size=int(os.getenv("BIGQUERY_BATCH_SIZE", "10")),
>      schema=schema,
> )
> {code}
>  
> Note that this issue does not occur when using the standard `DirectRunner`, nor does it occur when using the `STREAMING_INSERTS` method.
> Thanks! (And apologies if I left out any important information. This is the first issue I've opened here.)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)