You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/09/10 00:08:00 UTC

[jira] [Work logged] (BEAM-11956) 504 Deadline Exceeded code for very large datasets in Python

     [ https://issues.apache.org/jira/browse/BEAM-11956?focusedWorklogId=648936&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-648936 ]

ASF GitHub Bot logged work on BEAM-11956:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Sep/21 00:07
            Start Date: 10/Sep/21 00:07
    Worklog Time Spent: 10m 
      Work Description: pabloem commented on a change in pull request #15427:
URL: https://github.com/apache/beam/pull/15427#discussion_r705802328



##########
File path: sdks/python/setup.py
##########
@@ -189,7 +189,7 @@ def get_version():
     'google-cloud-bigquery>=1.6.0,<3',
     'google-cloud-core>=0.28.1,<2',
     'google-cloud-bigtable>=0.31.1,<2',
-    'google-cloud-spanner>=1.13.0,<2',
+    'google-cloud-spanner>=3.3.0',

Review comment:
       does it make sense to keep `<4` to avoid 'accidentally' upgrading a major version?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 648936)
    Remaining Estimate: 0h
            Time Spent: 10m

> 504 Deadline Exceeded code for very large datasets in Python
> ------------------------------------------------------------
>
>                 Key: BEAM-11956
>                 URL: https://issues.apache.org/jira/browse/BEAM-11956
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp
>         Environment: Python 3.8, Apache Beam SDK 2.28, Google Dataflow
>            Reporter: Sebastian Montero
>            Assignee: Sebastian Montero
>            Priority: P3
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> I am building an application in Apache Beam and Python that runs in Google DataFlow. I am using the {{ReadFromSpanner}} method in {{apache_beam.io.gcp.experimental.spannerio}}. This works for most of my Spanner tables but the really large ones that are >16m rows tend to fail due to the following error:
>  Traceback (most recent call last):
>   File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
>     work_executor.execute()
>   File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute
>     op.start()
>   File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
>   File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
>   File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
>   File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
>   File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
>   File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
>   File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
>   File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/experimental/spannerio.py", line 550, in process
>     for row in read_action(element['partitions']):
>   File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 143, in __iter__
>     self._consume_next()
>   File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 116, in _consume_next
>     response = six.next(self._response_iterator)
>   File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/snapshot.py", line 45, in _restart_on_unavailable
>     for item in iterator:
>   File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 116, in next
>     six.raise_from(exceptions.from_grpc_error(exc), exc)
>   File "<string>", line 3, in raise_from
> google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded [while running 'Read from Spanner/Read From Partitions']
> From my understanding this error comes from the {{ReadFromSpanner}} operation as it's workers have timed out.
> To solve this I have tried the following:
>  * Changed the {{num_workers}} and {{disk_size_gb}} and added the {{--experiments=shuffle_mode=service}} flag as suggested in [Google's Common error guidance|https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout]
>  * Changed the Machine Type from {{n1-standard-1}} to {{n1-standard-2}} from [here|https://cloud.google.com/compute/docs/machine-types#n1_machine_types]
> My latest code is attached below. I am including {{Transformation}} for simple data wrangling in the rows.
>  """Set pipeline arguments."""
>     options = PipelineOptions(
>         region=RUNNER_REGION,
>         project=RUNNER_PROJECT_ID,
>         runner=RUNNER,
>         temp_location=TEMP_LOCATION,
>         job_name=JOB_NAME,
>         service_account_email=SA_EMAIL,
>         setup_file=SETUP_FILE_PATH,
>         disk_size_gb=500,
>         num_workers=10,
>         machine_type="n1-standard-2",
>         save_main_session=True)
>     """Build and run the pipeline."""
>         with beam.Pipeline(options=options) as p:
>             (p
>              | "Read from Spanner" >> ReadFromSpanner(SPANNER_PROJECT_ID, SPANNER_INSTANCE_ID, SPANNER_DB, sql=QUERY)
>              | "Transform elements into dictionary" >> beam.ParDo(Transformation)
>              | "Write new records to BQ" >> WriteToBigQuery(
>                  BIGQUERY_TABLE,
>                  schema=SCHEMA,
>                  write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
>                  create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
>                  )
> A *potential solution* is to edit the timeout control; I have seen this being available in Java but not in Python. How can I edit timeout control in Python or is there any other solution to this issue?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)