You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2021/10/24 17:25:01 UTC

[jira] [Commented] (BEAM-11956) 504 Deadline Exceeded code for very large datasets in Python

    [ https://issues.apache.org/jira/browse/BEAM-11956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17433470#comment-17433470 ] 

Beam JIRA Bot commented on BEAM-11956:
--------------------------------------

This issue is assigned but has not received an update in 30 days so it has been labeled "stale-assigned". If you are still working on the issue, please give an update and remove the label. If you are no longer working on the issue, please unassign so someone else may work on it. In 7 days the issue will be automatically unassigned.

> 504 Deadline Exceeded code for very large datasets in Python
> ------------------------------------------------------------
>
>                 Key: BEAM-11956
>                 URL: https://issues.apache.org/jira/browse/BEAM-11956
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp
>         Environment: Python 3.8, Apache Beam SDK 2.28, Google Dataflow
>            Reporter: Sebastian Montero
>            Assignee: Sebastian Montero
>            Priority: P3
>              Labels: stale-assigned
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> I am building an application in Apache Beam and Python that runs in Google DataFlow. I am using the {{ReadFromSpanner}} method in {{apache_beam.io.gcp.experimental.spannerio}}. This works for most of my Spanner tables but the really large ones that are >16m rows tend to fail due to the following error:
>  Traceback (most recent call last):
>   File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
>     work_executor.execute()
>   File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute
>     op.start()
>   File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
>   File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
>   File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
>   File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
>   File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
>   File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
>   File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
>   File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/experimental/spannerio.py", line 550, in process
>     for row in read_action(element['partitions']):
>   File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 143, in __iter__
>     self._consume_next()
>   File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 116, in _consume_next
>     response = six.next(self._response_iterator)
>   File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/snapshot.py", line 45, in _restart_on_unavailable
>     for item in iterator:
>   File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 116, in next
>     six.raise_from(exceptions.from_grpc_error(exc), exc)
>   File "<string>", line 3, in raise_from
> google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded [while running 'Read from Spanner/Read From Partitions']
> From my understanding this error comes from the {{ReadFromSpanner}} operation as it's workers have timed out.
> To solve this I have tried the following:
>  * Changed the {{num_workers}} and {{disk_size_gb}} and added the {{--experiments=shuffle_mode=service}} flag as suggested in [Google's Common error guidance|https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout]
>  * Changed the Machine Type from {{n1-standard-1}} to {{n1-standard-2}} from [here|https://cloud.google.com/compute/docs/machine-types#n1_machine_types]
> My latest code is attached below. I am including {{Transformation}} for simple data wrangling in the rows.
>  """Set pipeline arguments."""
>     options = PipelineOptions(
>         region=RUNNER_REGION,
>         project=RUNNER_PROJECT_ID,
>         runner=RUNNER,
>         temp_location=TEMP_LOCATION,
>         job_name=JOB_NAME,
>         service_account_email=SA_EMAIL,
>         setup_file=SETUP_FILE_PATH,
>         disk_size_gb=500,
>         num_workers=10,
>         machine_type="n1-standard-2",
>         save_main_session=True)
>     """Build and run the pipeline."""
>         with beam.Pipeline(options=options) as p:
>             (p
>              | "Read from Spanner" >> ReadFromSpanner(SPANNER_PROJECT_ID, SPANNER_INSTANCE_ID, SPANNER_DB, sql=QUERY)
>              | "Transform elements into dictionary" >> beam.ParDo(Transformation)
>              | "Write new records to BQ" >> WriteToBigQuery(
>                  BIGQUERY_TABLE,
>                  schema=SCHEMA,
>                  write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
>                  create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
>                  )
> A *potential solution* is to edit the timeout control; I have seen this being available in Java but not in Python. How can I edit timeout control in Python or is there any other solution to this issue?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)