You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Brian Hulette (Jira)" <ji...@apache.org> on 2021/09/09 16:55:00 UTC

[jira] [Commented] (BEAM-12843) (Broken Pipe induced) Bricked Dataflow Pipeline 

    [ https://issues.apache.org/jira/browse/BEAM-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412702#comment-17412702 ] 

Brian Hulette commented on BEAM-12843:
--------------------------------------

[~chamikara] could you help route this?

> (Broken Pipe induced) Bricked Dataflow Pipeline 
> ------------------------------------------------
>
>                 Key: BEAM-12843
>                 URL: https://issues.apache.org/jira/browse/BEAM-12843
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp, runner-dataflow
>    Affects Versions: 2.31.0
>            Reporter: Ryan Tam
>            Priority: P1
>
> We are seeing Dataflow pipelines being stuck indefinitely, the common theme of this behaviour is a bundle failing with the Broken Pipe error and subsequently the next bundle being stuck at the `StartBundle` stage (reported by Dataflow).
> Specifically, we see an exception like the following for a bundle (truncated re-raise exception log as it’s long):-
>  
> {code:java}
> "Error processing instruction process_bundle-7079259598045896145-12555. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 875, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 1359, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "/usr/local/lib/python3.6/site-packages/resolution/utilities/beam.py", line 192, in process
>     writer.write(element)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1454, in write
>     return self._file_handle.write(self._coder.encode(row) + b'\n')
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line 200, in write
>     self._uploader.put(b)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", line 661, in put
>     self._conn.send_bytes(data.tobytes())
>   File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
>     self._send_bytes(m[offset:offset + size])
>   File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 398, in _send_bytes
>     self._send(buf)
>   File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 368, in _send
>     n = write(self._handle, buf)
> BrokenPipeError: [Errno 32] Broken pipe
> {code}
> And as previously mentioned,  the next bundle is stuck at the `StartBundle` stage (reported by Dataflow), the progress report thread logs message like these:-
>  
> {code:java}
> "Operation ongoing for over 10087.60 seconds in state start-msecs in step Assign to Location for POI joins-ptransform-49654  without returning. Current Traceback:
>   File "/usr/local/lib/python3.6/threading.py", line 884, in _bootstrap
>     self._bootstrap_inner()
>   File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner
>     self.run()
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
>     self._work_item.run()
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
>     self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in task
>     lambda: self.create_worker().do_instruction(request), request)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
>     response = task()
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 983, in process_bundle
>     expected_inputs):
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py", line 459, in input_elements
>     element = received.get(timeout=1)
>   File "/usr/local/lib/python3.6/queue.py", line 173, in get
>     self.not_empty.wait(remaining)
>   File "/usr/local/lib/python3.6/threading.py", line 299, in wait
>     gotit = waiter.acquire(True, timeout)
> {code}
>  
> *Some details about the Broken Pipe error*
> As observed from the logs, the exception is related to [this line|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L720], since the exception is a BrokenPipeError instead of an OSError, the connection must have been closed from the other end, i.e. [here|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L716].
> Since it’s closed from the other end, there must be some error in [this|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L705] try/except block, searching through the logs does reveal an error in the thread spawned for uploading whatever data is sent through the pipe.
> Error as follow:-
>  
> {code:java}
> "Error in _start_upload while inserting file gs://hc-resolution-temp/bq_load_staging/055471a8-bef6-4afb-a850-3a5f9edc43f6/huq-core.enriched_impression_1.impressions/1901c43b-ecd0-49f7-a03a-6d6aa418d36d: Traceback (most recent call last):
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", line 649, in _start_upload
>     self._client.objects.Insert(self._insert_request, upload=self._upload)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1154, in Insert
>     upload=upload, upload_config=upload_config)
>   File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 715, in _RunMethod
>     http_request, client=self.client)
>   File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 908, in InitializeUpload
>     return self.StreamInChunks()
>   File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 1020, in StreamInChunks
>     additional_headers=additional_headers)
>   File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 971, in __StreamMedia
>     self.RefreshResumableUploadState()
>   File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 873, in RefreshResumableUploadState
>     self.stream.seek(self.progress)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line 302, in seek
>     (offset, whence, self.position, self.last_block_position))
> NotImplementedError: offset: 169869312, whence: 0, position: 176160768, last: 167772160
> {code}
>  
> Specifically [this error|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/filesystemio.py#L300], when seeking an offset between the last block position and current offset.
>  
> My thoughts:
>  * Why is the pipeline stalled at the `StartBundle` stage after such a Broken Pipe Error? Something to do with the uploader thread?
>  * Could seeking an offset between the last block position and current offset be implemented without significant repercussions?
>  
> Same issue but with picture attached [here|https://docs.google.com/document/d/13ccydJSpfdUU_czHg5NeP2_LPJh8E2NZknShVAemDYs/edit?usp=sharing]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)