You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 20:42:59 UTC

[GitHub] [beam] damccorm opened a new issue, #20940: 504 Deadline Exceeded code for very large datasets in Python

damccorm opened a new issue, #20940:
URL: https://github.com/apache/beam/issues/20940

   I am building an application in Apache Beam and Python that runs in Google DataFlow. I am using the `ReadFromSpanner` method in `apache_beam.io.gcp.experimental.spannerio`. This works for most of my Spanner tables but the really large ones that are \>16m rows tend to fail due to the following error:
    Traceback (most recent call last):
     File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
       work_executor.execute()
     File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute
       op.start()
     File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
     File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
     File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
     File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
     File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
     File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
     File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
     File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
     File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
     File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
     File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
     File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
     File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
     File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
     File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
       raise exc.with_traceback(traceback)
     File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
     File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
     File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/experimental/spannerio.py", line 550, in process
       for row in read_action(element['partitions']):
     File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 143, in __iter__
       self._consume_next()
     File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 116, in _consume_next
       response = six.next(self._response_iterator)
     File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/snapshot.py", line 45, in _restart_on_unavailable
       for item in iterator:
     File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 116, in next
       six.raise_from(exceptions.from_grpc_error(exc), exc)
     File "<string\>", line 3, in raise_from
   google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded [while running 'Read from Spanner/Read From Partitions']
   From my understanding this error comes from the `ReadFromSpanner` operation as it's workers have timed out.
   
   To solve this I have tried the following:
    * Changed the `num_workers` and `disk_size_gb` and added the `--experiments=shuffle_mode=service` flag as suggested in [Google's Common error guidance](https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout)
    * Changed the Machine Type from `n1-standard-1` to `n1-standard-2` from [here](https://cloud.google.com/compute/docs/machine-types#n1_machine_types)
   
   My latest code is attached below. I am including `Transformation` for simple data wrangling in the rows.
    """Set pipeline arguments."""
       options = PipelineOptions(
           region=RUNNER_REGION,
           project=RUNNER_PROJECT_ID,
           runner=RUNNER,
           temp_location=TEMP_LOCATION,
           job_name=JOB_NAME,
           service_account_email=SA_EMAIL,
           setup_file=SETUP_FILE_PATH,
           disk_size_gb=500,
           num_workers=10,
           machine_type="n1-standard-2",
           save_main_session=True)
   
       """Build and run the pipeline."""
           with beam.Pipeline(options=options) as p:
               (p
                | "Read from Spanner" \>\> ReadFromSpanner(SPANNER_PROJECT_ID, SPANNER_INSTANCE_ID, SPANNER_DB, sql=QUERY)
                | "Transform elements into dictionary" \>\> beam.ParDo(Transformation)
                | "Write new records to BQ" \>\> WriteToBigQuery(
                    BIGQUERY_TABLE,
                    schema=SCHEMA,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
                    )
   A *potential solution* is to edit the timeout control; I have seen this being available in Java but not in Python. How can I edit timeout control in Python or is there any other solution to this issue?
   
   Imported from Jira [BEAM-11956](https://issues.apache.org/jira/browse/BEAM-11956). Original Jira may contain additional context.
   Reported by: sebastian-montero.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org