You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 22:30:40 UTC

[GitHub] [beam] damccorm opened a new issue, #21263: (Broken Pipe induced) Bricked Dataflow Pipeline

damccorm opened a new issue, #21263:
URL: https://github.com/apache/beam/issues/21263

   We are seeing Dataflow pipelines being stuck indefinitely, the common theme of this behaviour is a bundle failing with the Broken Pipe error and subsequently the next bundle being stuck at the `StartBundle` stage (reported by Dataflow).
   
   Specifically, we see an exception like the following for a bundle (truncated re-raise exception log as it’s long):-
   
    
   ```
   
   "Error processing instruction process_bundle-7079259598045896145-12555. Original traceback is
   Traceback
   (most recent call last):
     File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
   
    File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process
   
    File "apache_beam/runners/common.py", line 875, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
   
    File "apache_beam/runners/common.py", line 1359, in apache_beam.runners.common._OutputProcessor.process_outputs
   
    File "/usr/local/lib/python3.6/site-packages/resolution/utilities/beam.py", line 192, in process
   
      writer.write(element)
     File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py",
   line 1454, in write
       return self._file_handle.write(self._coder.encode(row) + b'\n')
     File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py",
   line 200, in write
       self._uploader.put(b)
     File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py",
   line 661, in put
       self._conn.send_bytes(data.tobytes())
     File "/usr/local/lib/python3.6/multiprocessing/connection.py",
   line 200, in send_bytes
       self._send_bytes(m[offset:offset + size])
     File "/usr/local/lib/python3.6/multiprocessing/connection.py",
   line 398, in _send_bytes
       self._send(buf)
     File "/usr/local/lib/python3.6/multiprocessing/connection.py",
   line 368, in _send
       n = write(self._handle, buf)
   BrokenPipeError: [Errno 32] Broken pipe
   
   ```
   
   And as previously mentioned,  the next bundle is stuck at the `StartBundle` stage (reported by Dataflow), the progress report thread logs message like these:-
   
    
   ```
   
   "Operation ongoing for over 10087.60 seconds in state start-msecs in step Assign to Location for POI
   joins-ptransform-49654  without returning. Current Traceback:
     File "/usr/local/lib/python3.6/threading.py",
   line 884, in _bootstrap
       self._bootstrap_inner()
   
     File "/usr/local/lib/python3.6/threading.py",
   line 916, in _bootstrap_inner
       self.run()
   
     File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py",
   line 53, in run
       self._work_item.run()
   
     File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py",
   line 37, in run
       self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
   
     File
   "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in task
   
      lambda: self.create_worker().do_instruction(request), request)
   
     File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 284, in _execute
       response = task()
   
     File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 357, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
   
     File
   "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction
   
      getattr(request, request_type), request.instruction_id)
   
     File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 639, in process_bundle
       bundle_processor.process_bundle(instruction_id))
   
     File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 983, in process_bundle
       expected_inputs):
   
     File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py",
   line 459, in input_elements
       element = received.get(timeout=1)
   
     File "/usr/local/lib/python3.6/queue.py",
   line 173, in get
       self.not_empty.wait(remaining)
   
     File "/usr/local/lib/python3.6/threading.py",
   line 299, in wait
       gotit = waiter.acquire(True, timeout)
   
   ```
   
    
   
   *Some details about the Broken Pipe error*
   
   As observed from the logs, the exception is related to [this line](https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L720), since the exception is a BrokenPipeError instead of an OSError, the connection must have been closed from the other end, i.e. [here](https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L716).
   
   Since it’s closed from the other end, there must be some error in [this](https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L705) try/except block, searching through the logs does reveal an error in the thread spawned for uploading whatever data is sent through the pipe.
   
   Error as follow:-
   
    
   ```
   
   "Error in _start_upload while inserting file gs://hc-resolution-temp/bq_load_staging/055471a8-bef6-4afb-a850-3a5f9edc43f6/huq-core.enriched_impression_1.impressions/1901c43b-ecd0-49f7-a03a-6d6aa418d36d:
   Traceback (most recent call last):
     File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py",
   line 649, in _start_upload
       self._client.objects.Insert(self._insert_request, upload=self._upload)
   
    File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
   line 1154, in Insert
       upload=upload, upload_config=upload_config)
     File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py",
   line 715, in _RunMethod
       http_request, client=self.client)
     File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py",
   line 908, in InitializeUpload
       return self.StreamInChunks()
     File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py",
   line 1020, in StreamInChunks
       additional_headers=additional_headers)
     File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py",
   line 971, in __StreamMedia
       self.RefreshResumableUploadState()
     File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py",
   line 873, in RefreshResumableUploadState
       self.stream.seek(self.progress)
     File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py",
   line 302, in seek
       (offset, whence, self.position, self.last_block_position))
   NotImplementedError:
   offset: 169869312, whence: 0, position: 176160768, last: 167772160
   
   ```
   
    
   
   Specifically [this error](https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/filesystemio.py#L300), when seeking an offset between the last block position and current offset.
   
    
   
   My thoughts:
    * Why is the pipeline stalled at the `StartBundle` stage after such a Broken Pipe Error? Something to do with the uploader thread?
    * Could seeking an offset between the last block position and current offset be implemented without significant repercussions?
   
    
   
   Same issue but with picture attached [here](https://docs.google.com/document/d/13ccydJSpfdUU_czHg5NeP2_LPJh8E2NZknShVAemDYs/edit?usp=sharing)
   
   Imported from Jira [BEAM-12843](https://issues.apache.org/jira/browse/BEAM-12843). Original Jira may contain additional context.
   Reported by: ryantam626.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #21263: (Broken Pipe induced) Bricked Dataflow Pipeline

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #21263:
URL: https://github.com/apache/beam/issues/21263#issuecomment-1246017170

   Since this hasn't got an update or a "me too" in a long time, I am going to close it on the assumption that it was resolved or transient. It was seemingly specific to a particular customer/pipeline so most effectively debugged via user support unless we had a repro. Feel free to reopen if it happens more, or if you have more information or a reproduction.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles closed issue #21263: (Broken Pipe induced) Bricked Dataflow Pipeline

Posted by GitBox <gi...@apache.org>.
kennknowles closed issue #21263: (Broken Pipe induced) Bricked Dataflow Pipeline 
URL: https://github.com/apache/beam/issues/21263


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ewianda commented on issue #21263: (Broken Pipe induced) Bricked Dataflow Pipeline

Posted by GitBox <gi...@apache.org>.
ewianda commented on issue #21263:
URL: https://github.com/apache/beam/issues/21263#issuecomment-1354920391

   me too


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org