You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Miles McCain (Jira)" <ji...@apache.org> on 2021/07/26 17:16:00 UTC

[jira] [Created] (BEAM-12659) WriteToBigQuery in BundleBasedDirectRunner fails when method is FILE_LOADS

Miles McCain created BEAM-12659:
-----------------------------------

             Summary: WriteToBigQuery in BundleBasedDirectRunner fails when method is FILE_LOADS
                 Key: BEAM-12659
                 URL: https://issues.apache.org/jira/browse/BEAM-12659
             Project: Beam
          Issue Type: Bug
          Components: io-py-gcp, runner-py-direct
    Affects Versions: 2.31.0
         Environment: Ubuntu 20.04 (running inside Docker, python:3.8-slim)
            Reporter: Miles McCain


`WriteToBigQuery` fails when using the `FILE_LOADS` method in the `BundleBasedDirectRunner`.

The issue appears to be in `wait_for_bq_job`, where the function expects `job_reference` to be an actual JobReference instance and not a string. However, the `WaitForBQJobs` DoFn appears to be [passing a string](https://github.com/apache/beam/blob/5a029fd97d663e19a9bcd6bff61648bccbd7f95b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L730) as the argument. I believe this is during the copy step, and I'm not calling this code directly (so unfortunately I can't just pass a TableReference instance myself).

Here is a traceback:

```
request_worker_1      | ERROR:root:Traceback (most recent call last):
request_worker_1      |   File "/app/main.py", line 209, in process_message
request_worker_1      |     construct_and_run_pipeline(request)
request_worker_1      |   File "/app/main.py", line 190, in construct_and_run_pipeline
request_worker_1      |     return result.wait_until_finish()
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py", line 588, in wait_until_finish
request_worker_1      |     self._executor.await_completion()
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line 433, in await_completion
request_worker_1      |     self._executor.await_completion()
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line 482, in await_completion
request_worker_1      |     raise t(v).with_traceback(tb)
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line 371, in call
request_worker_1      |     self.attempt_call(
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line 414, in attempt_call
request_worker_1      |     evaluator.process_element(value)
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 880, in process_element
request_worker_1      |     self.runner.process(element)
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 1225, in process
request_worker_1      |     self._reraise_augmented(exn)
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 1306, in _reraise_augmented
request_worker_1      |     raise new_exn.with_traceback(tb)
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 1223, in process
request_worker_1      |     return self.do_fn_invoker.invoke_process(windowed_value)
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 752, in invoke_process
request_worker_1      |     self._invoke_process_per_window(
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 877, in _invoke_process_per_window
request_worker_1      |     self.process_method(*args_for_process),
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 730, in process
request_worker_1      |     self.bq_wrapper.wait_for_bq_job(ref, sleep_duration_sec=10, max_retries=0)
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 562, in wait_for_bq_job
request_worker_1      |     job_reference.projectId, job_reference.jobId, job_reference.location)
request_worker_1      | AttributeError: 'str' object has no attribute 'projectId' [while running 'write tweets to bigquery/Write/BigQueryBatchFileLoads/WaitForTempTableLoadJobs']
```

Here is the `WriteToBigQuery` step that is failing (note that the callable passed for `table` returns a TableReference instance):

```python
WriteToBigQuery(
                table=lambda row: bigquery_tools.parse_table_reference(row["table_name"]),
                create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                ignore_insert_ids=True,
                # method="STREAMING_INSERTS",  # using STREAMING_INSERTS 'fixes' the issue 
                batch_size=int(os.getenv("BIGQUERY_BATCH_SIZE", "10")),
                schema=schema,
)
``` 

Note that this issue does not occur when using the standard `DirectRunner`, nor does it occur when using the `STREAMING_INSERTS` method.

Thanks! (And apologies if I left out any important information. This is the first issue I've opened here.)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)