You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 20:42:59 UTC
[GitHub] [beam] damccorm opened a new issue, #20940: 504 Deadline Exceeded code for very large datasets in Python
damccorm opened a new issue, #20940:
URL: https://github.com/apache/beam/issues/20940
I am building an application in Apache Beam and Python that runs in Google DataFlow. I am using the `ReadFromSpanner` method in `apache_beam.io.gcp.experimental.spannerio`. This works for most of my Spanner tables but the really large ones that are \>16m rows tend to fail due to the following error:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
work_executor.execute()
File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/experimental/spannerio.py", line 550, in process
for row in read_action(element['partitions']):
File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 143, in __iter__
self._consume_next()
File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 116, in _consume_next
response = six.next(self._response_iterator)
File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/snapshot.py", line 45, in _restart_on_unavailable
for item in iterator:
File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 116, in next
six.raise_from(exceptions.from_grpc_error(exc), exc)
File "<string\>", line 3, in raise_from
google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded [while running 'Read from Spanner/Read From Partitions']
From my understanding this error comes from the `ReadFromSpanner` operation as it's workers have timed out.
To solve this I have tried the following:
* Changed the `num_workers` and `disk_size_gb` and added the `--experiments=shuffle_mode=service` flag as suggested in [Google's Common error guidance](https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout)
* Changed the Machine Type from `n1-standard-1` to `n1-standard-2` from [here](https://cloud.google.com/compute/docs/machine-types#n1_machine_types)
My latest code is attached below. I am including `Transformation` for simple data wrangling in the rows.
"""Set pipeline arguments."""
options = PipelineOptions(
region=RUNNER_REGION,
project=RUNNER_PROJECT_ID,
runner=RUNNER,
temp_location=TEMP_LOCATION,
job_name=JOB_NAME,
service_account_email=SA_EMAIL,
setup_file=SETUP_FILE_PATH,
disk_size_gb=500,
num_workers=10,
machine_type="n1-standard-2",
save_main_session=True)
"""Build and run the pipeline."""
with beam.Pipeline(options=options) as p:
(p
| "Read from Spanner" \>\> ReadFromSpanner(SPANNER_PROJECT_ID, SPANNER_INSTANCE_ID, SPANNER_DB, sql=QUERY)
| "Transform elements into dictionary" \>\> beam.ParDo(Transformation)
| "Write new records to BQ" \>\> WriteToBigQuery(
BIGQUERY_TABLE,
schema=SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
A *potential solution* is to edit the timeout control; I have seen this being available in Java but not in Python. How can I edit timeout control in Python or is there any other solution to this issue?
Imported from Jira [BEAM-11956](https://issues.apache.org/jira/browse/BEAM-11956). Original Jira may contain additional context.
Reported by: sebastian-montero.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org