You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2021/10/24 17:25:01 UTC

[jira] [Commented] (BEAM-12843) (Broken Pipe induced) Bricked Dataflow Pipeline 

    [ https://issues.apache.org/jira/browse/BEAM-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17433468#comment-17433468 ] 

Beam JIRA Bot commented on BEAM-12843:
--------------------------------------

This issue is assigned but has not received an update in 30 days so it has been labeled "stale-assigned". If you are still working on the issue, please give an update and remove the label. If you are no longer working on the issue, please unassign so someone else may work on it. In 7 days the issue will be automatically unassigned.

> (Broken Pipe induced) Bricked Dataflow Pipeline 
> ------------------------------------------------
>
>                 Key: BEAM-12843
>                 URL: https://issues.apache.org/jira/browse/BEAM-12843
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp, runner-dataflow
>    Affects Versions: 2.31.0
>            Reporter: Ryan Tam
>            Assignee: Chamikara Madhusanka Jayalath
>            Priority: P1
>              Labels: stale-assigned
>
> We are seeing Dataflow pipelines being stuck indefinitely, the common theme of this behaviour is a bundle failing with the Broken Pipe error and subsequently the next bundle being stuck at the `StartBundle` stage (reported by Dataflow).
> Specifically, we see an exception like the following for a bundle (truncated re-raise exception log as it’s long):-
>  
> {code:java}
> "Error processing instruction process_bundle-7079259598045896145-12555. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 875, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 1359, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "/usr/local/lib/python3.6/site-packages/resolution/utilities/beam.py", line 192, in process
>     writer.write(element)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1454, in write
>     return self._file_handle.write(self._coder.encode(row) + b'\n')
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line 200, in write
>     self._uploader.put(b)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", line 661, in put
>     self._conn.send_bytes(data.tobytes())
>   File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
>     self._send_bytes(m[offset:offset + size])
>   File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 398, in _send_bytes
>     self._send(buf)
>   File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 368, in _send
>     n = write(self._handle, buf)
> BrokenPipeError: [Errno 32] Broken pipe
> {code}
> And as previously mentioned,  the next bundle is stuck at the `StartBundle` stage (reported by Dataflow), the progress report thread logs message like these:-
>  
> {code:java}
> "Operation ongoing for over 10087.60 seconds in state start-msecs in step Assign to Location for POI joins-ptransform-49654  without returning. Current Traceback:
>   File "/usr/local/lib/python3.6/threading.py", line 884, in _bootstrap
>     self._bootstrap_inner()
>   File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner
>     self.run()
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
>     self._work_item.run()
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
>     self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in task
>     lambda: self.create_worker().do_instruction(request), request)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
>     response = task()
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 983, in process_bundle
>     expected_inputs):
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py", line 459, in input_elements
>     element = received.get(timeout=1)
>   File "/usr/local/lib/python3.6/queue.py", line 173, in get
>     self.not_empty.wait(remaining)
>   File "/usr/local/lib/python3.6/threading.py", line 299, in wait
>     gotit = waiter.acquire(True, timeout)
> {code}
>  
> *Some details about the Broken Pipe error*
> As observed from the logs, the exception is related to [this line|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L720], since the exception is a BrokenPipeError instead of an OSError, the connection must have been closed from the other end, i.e. [here|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L716].
> Since it’s closed from the other end, there must be some error in [this|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L705] try/except block, searching through the logs does reveal an error in the thread spawned for uploading whatever data is sent through the pipe.
> Error as follow:-
>  
> {code:java}
> "Error in _start_upload while inserting file gs://hc-resolution-temp/bq_load_staging/055471a8-bef6-4afb-a850-3a5f9edc43f6/huq-core.enriched_impression_1.impressions/1901c43b-ecd0-49f7-a03a-6d6aa418d36d: Traceback (most recent call last):
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", line 649, in _start_upload
>     self._client.objects.Insert(self._insert_request, upload=self._upload)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1154, in Insert
>     upload=upload, upload_config=upload_config)
>   File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 715, in _RunMethod
>     http_request, client=self.client)
>   File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 908, in InitializeUpload
>     return self.StreamInChunks()
>   File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 1020, in StreamInChunks
>     additional_headers=additional_headers)
>   File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 971, in __StreamMedia
>     self.RefreshResumableUploadState()
>   File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 873, in RefreshResumableUploadState
>     self.stream.seek(self.progress)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line 302, in seek
>     (offset, whence, self.position, self.last_block_position))
> NotImplementedError: offset: 169869312, whence: 0, position: 176160768, last: 167772160
> {code}
>  
> Specifically [this error|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/filesystemio.py#L300], when seeking an offset between the last block position and current offset.
>  
> My thoughts:
>  * Why is the pipeline stalled at the `StartBundle` stage after such a Broken Pipe Error? Something to do with the uploader thread?
>  * Could seeking an offset between the last block position and current offset be implemented without significant repercussions?
>  
> Same issue but with picture attached [here|https://docs.google.com/document/d/13ccydJSpfdUU_czHg5NeP2_LPJh8E2NZknShVAemDYs/edit?usp=sharing]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)