You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 21:01:50 UTC

[GitHub] [beam] damccorm opened a new issue, #21065: Python GCSIO upload does not retry

damccorm opened a new issue, #21065:
URL: https://github.com/apache/beam/issues/21065

   vI have a streaming pipeline running in Dataflow which is loading data to Google Cloud Storage. I get sporadic errors like "Error in _start_upload while inserting file ...". The underlying issue seems to be that there is no retry logic applied in method *_start_upload* [here](https://github.com/apache/beam/blob/v2.30.0/sdks/python/apache_beam/io/gcp/gcsio.py#L639) :
   
   (There is even a TODO stating the need for this implementation.): 
   ```
   
   # TODO(silviuc): Refactor so that retry logic can be applied.
    # There is retry logic in the underlying
   transfer library but we should make
    # it more explicit so we can control the retry parameters.
    @retry.no_retries
   # Using no_retries marks this as an integration point.
    def _start_upload(self):
   ```
   
    
   
   All the other methods [in the same module](https://github.com/apache/beam/blob/v2.30.0/sdks/python/apache_beam/io/gcp/gcsio.py) have this backoff implementation:
   
    
   ```
   
   @retry.with_exponential_backoff(
    retry_filter=retry.retry_on_server_errors_and_timeout_filter)
   ```
   
    
   
   , as the [Google Cloud Storage docs suggest.](https://cloud.google.com/storage/docs/retry-strategy)
   
   Is there any potential problem simply adding the same backoff implementation to the def *_start_upload* method?
   
    
   
   It's difficult to state the rate when these errors occurs, since it's a backend issue which is not properly handled in the code. In my case, for a pipeline which is constantly handling loads of events, in the last 15 days there were 5 occurrences. However, even if the rate is low, my main concern here is that from my point of view when these errors on a resumable upload are thrown, since there is no retry strategy, *I'm just loosing that data right*?
   
   If I'm wrong I'd love to learn why, what's actually happening and what I'm missing. If I'm right, it means that there is potential data losses and the priority for this should be raised?
   
    
   
   In my case I'm using Dataflow with ApacheBeam 2.28, but checking the code in the different versions the problem would be the same.
   
    
   
   The piece of code where this is happening is this:
   
    
   ```
   
    
   from apache_beam.io.fileio import WriteToFiles 
          ... 
          | "Write to GCS" >> WriteToFiles(
   
                               path=output_path, shards=1, max_writers_per_bundle=0,
                   
               destination=lambda record: record['topic_kafka'],
                                sink=JsonSink(),
   
                                file_naming=destination_partitioning_naming(extension="json", topics=topics)
   ) )
    
   
   ```
   
    
   
   ***EDIT:
   I got an answer in a Stackoverflow question I made:
   [https://stackoverflow.com/questions/67972758/apache-beam-python-gscio-upload-method-has-retry-no-retries-implemented-causes/67975695#67975695](https://stackoverflow.com/questions/67972758/apache-beam-python-gscio-upload-method-has-retry-no-retries-implemented-causes/67975695#67975695)
   
   Referencing this doc:
   [https://cloud.google.com/dataflow/docs/resources/faq#how-are-java-exceptions-handled-in-cloud-dataflow](https://cloud.google.com/dataflow/docs/resources/faq#how-are-java-exceptions-handled-in-cloud-dataflow)
   
   
   
   It makes sense that since Dataflow retries work items the code itself doesn't need to have the retry logic. Still, is there any problem implementing the @retry.with_exponential_backoff(..) in this method "_start_upload"?  I guess at least it would be cleaner
   
    
   
   Here [1] is the full stacktrace of the error that appears in the main Dataflow page.
   
   [1] 
    2021-06-10 16:58:55.104 CEST
    Error message from worker: generic::unknown: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 768, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 891, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.8/site-packages/apache_beam/io/fileio.py", line 620, in process writer.close() File "/usr/local/lib/python3.8/site-packages/apache_beam/io/filesystemio.py", line 220, in close self._uploader.finish() File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 676, in finish raise self._upload_thread.last_error # pylint: disable=raising-bad-type File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", l
 ine 651, in _start_upload self._client.objects.Insert(self._insert_request, upload=self._upload) File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1154, in Insert return self._RunMethod( File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod return self.ProcessHttpResponse(method_config, http_response, request) File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse self.__ProcessHttpResponse(method_config, http_response, request)) File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse raise exceptions.HttpError.FromResponse( apitools.base.py.exceptions.HttpError: HttpError accessing <[https://www.googleapis.com/resumable/upload/storage/v1/b/](https://www.googleapis.com/resumable/upload/storage/v1/b/)<bucket-name\>/o?alt=json&name=tmp%2F.tempaf83360e-673f-4f9a-b15a-
 5be45081c335%2F3919075269125806430_9535790f-f57d-430f-9631-f121966e5ca4&uploadType=resumable&upload_id=<uploadid\>\>: response: <\{'content-type': 'text/plain; charset=utf-8', 'x-guploader-uploadid': '<id\>, 'content-length': '0', 'date': 'Thu, 10 Jun 2021 14:58:51 GMT', 'server': 'UploadServer', 'status': '503'}\>, content <\> During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda\> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 606, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_b
 undle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 999, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process Fil
 e "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 768, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 891, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.8/site-packages/apache_beam/io/fileio.py", line 620, in process writer.close() File "/usr/local/lib/python3.8/site-packages/apache_beam/io/filesystemio.py", line 220, in close self
 ._uploader.finish() File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 676, in finish raise self._upload_thread.last_error # pylint: disable=raising-bad-type File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 651, in _start_upload self._client.objects.Insert(self._insert_request, upload=self._upload) File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1154, in Insert return self._RunMethod( File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod return self.ProcessHttpResponse(method_config, http_response, request) File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse self.__ProcessHttpResponse(method_config, http_response, request)) File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse raise exceptions.HttpE
 rror.FromResponse( RuntimeError: apitools.base.py.exceptions.HttpError: HttpError accessing <[https://www.googleapis.com/resumable/upload/storage/v1/b/](https://www.googleapis.com/resumable/upload/storage/v1/b/)<bucke-name\>/o?alt=json&name=tmp%2F.tempaf83360e-673f-4f9a-b15a-5be45081c335%2F3919075269125806430_9535790f-f57d-430f-9631-f121966e5ca4&uploadType=resumable&upload_id=<id\>\>: response: <\{'content-type': 'text/plain; charset=utf-8', 'x-guploader-uploadid': '<id\>', 'content-length': '0', 'date': 'Thu, 10 Jun 2021 14:58:51 GMT', 'server': 'UploadServer', 'status': '503'}\>, content <\> [while running 'Write to GCS/ParDo(_WriteShardedRecordsFn)-ptransform-50705'] passed through: \==\> dist_proc/dax/workflow/worker/fnapi_service.cc:631
   
   Imported from Jira [BEAM-12489](https://issues.apache.org/jira/browse/BEAM-12489). Original Jira may contain additional context.
   Reported by: VictorGea.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org