You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/08/03 20:35:30 UTC
[GitHub] [beam] satybald commented on pull request #15185: [BEAM-10917] Add support for BigQuery Read API in Python BEAM
satybald commented on pull request #15185:
URL: https://github.com/apache/beam/pull/15185#issuecomment-892147868
hey @vachan-shetty I've tried out a new BQ storage PTransform and found some issues. Let me know what do you think of them:
The pipeline code:
```
# dataset_latencies dataset is 3.2TB
with beam.Pipeline(options=options) as pipeline:
(
pipeline
| ReadBQStorage(project= "GCP_PROJECT", dataset="telemetry", table="dataset_latencies")
| 'Count all elements' >> apache_beam.combiners.Count.Globally()
| beam.Log()
)
```
The pipeline(let's call **V1**) run for 8 hours with 50 workers, machine type: n2-standard-8 with Datflow runner V1. Experiments options:
```
['use_beam_bq_sink', 'disable_runner_v2', 'shuffle_mode=service', 'use_monitoring_state_manager', 'enable_execution_details_collection', 'use_fastavro']
```
Also it log following exceptions:
```
Error message from worker: Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 107, in next
return six.next(self._wrapped)
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__
return self._next()
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 706, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "No status received"
debug_error_string = "{"created":"@1627723826.888538925","description":"No status received","file":"src/core/lib/surface/call.cc","file_line":1081,"grpc_status":2}"
>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 48, in dataflow_worker.native_operations.NativeReadOperation.start
File "/usr/local/lib/python3.7/site-packages/terra/beam/io/beam_storage_api.py", line 294, in __next__
self.read_rows_response = next(self.read_rows_iterator, None)
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery_storage_v1/reader.py", line 134, in __iter__
for message in self._wrapped:
File "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 110, in next
six.raise_from(exceptions.from_grpc_error(exc), exc)
File "<string>", line 3, in raise_from
google.api_core.exceptions.Unknown: None No status received
```
Pipeline V2 with Dataflow runner V2, experiment options:
```
['use_runner_v2', 'use_beam_bq_sink', 'shuffle_mode=service', 'use_monitoring_state_manager', 'enable_execution_details_collection', 'use_unified_worker', 'beam_fn_api', 'use_fastavro', 'use_multiple_sdk_containers']
```
It runs for 10 hours and produced the following exception stack trace:
```
Error message from worker: Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 107, in next
return six.next(self._wrapped)
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__
return self._next()
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 706, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "No status received"
debug_error_string = "{"created":"@1627723816.248565623","description":"No status received","file":"src/core/lib/surface/call.cc","file_line":1081,"grpc_status":2}"
>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1001, in process_bundle
element.data)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 229, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 358, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 220, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 828, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
File "apache_beam/runners/worker/operations.py", line 837, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs
File "/usr/local/lib/python3.7/site-packages/terra/beam/io/beam_storage_api.py", line 294, in __next__
self.read_rows_response = next(self.read_rows_iterator, None)
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery_storage_v1/reader.py", line 134, in __iter__
for message in self._wrapped:
File "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 110, in next
six.raise_from(exceptions.from_grpc_error(exc), exc)
File "<string>", line 3, in raise_from
google.api_core.exceptions.Unknown: None No status received
```
and there were also pipeline V3 that run with `SELECT * FROM dataset` for 1 hour, with Dataflow Runner V1 and AVRO export.
My questions:
* Do you know what might cause following GRPC exceptions?
* Why might there's such big difference between BigQuery Storage API client and regular QUERY-EXTRACT-READ Transform?
Let me know if you need more questions or have any suggestions how to debug BQ storage client pipeline futher.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org