You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 22:30:40 UTC
[GitHub] [beam] damccorm opened a new issue, #21263: (Broken Pipe induced) Bricked Dataflow Pipeline
damccorm opened a new issue, #21263:
URL: https://github.com/apache/beam/issues/21263
We are seeing Dataflow pipelines being stuck indefinitely, the common theme of this behaviour is a bundle failing with the Broken Pipe error and subsequently the next bundle being stuck at the `StartBundle` stage (reported by Dataflow).
Specifically, we see an exception like the following for a bundle (truncated re-raise exception log as it’s long):-
```
"Error processing instruction process_bundle-7079259598045896145-12555. Original traceback is
Traceback
(most recent call last):
File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 875, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1359, in apache_beam.runners.common._OutputProcessor.process_outputs
File "/usr/local/lib/python3.6/site-packages/resolution/utilities/beam.py", line 192, in process
writer.write(element)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py",
line 1454, in write
return self._file_handle.write(self._coder.encode(row) + b'\n')
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py",
line 200, in write
self._uploader.put(b)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py",
line 661, in put
self._conn.send_bytes(data.tobytes())
File "/usr/local/lib/python3.6/multiprocessing/connection.py",
line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/usr/local/lib/python3.6/multiprocessing/connection.py",
line 398, in _send_bytes
self._send(buf)
File "/usr/local/lib/python3.6/multiprocessing/connection.py",
line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
```
And as previously mentioned, the next bundle is stuck at the `StartBundle` stage (reported by Dataflow), the progress report thread logs message like these:-
```
"Operation ongoing for over 10087.60 seconds in state start-msecs in step Assign to Location for POI
joins-ptransform-49654 without returning. Current Traceback:
File "/usr/local/lib/python3.6/threading.py",
line 884, in _bootstrap
self._bootstrap_inner()
File "/usr/local/lib/python3.6/threading.py",
line 916, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py",
line 53, in run
self._work_item.run()
File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py",
line 37, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in task
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 284, in _execute
response = task()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 357, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 639, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 983, in process_bundle
expected_inputs):
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py",
line 459, in input_elements
element = received.get(timeout=1)
File "/usr/local/lib/python3.6/queue.py",
line 173, in get
self.not_empty.wait(remaining)
File "/usr/local/lib/python3.6/threading.py",
line 299, in wait
gotit = waiter.acquire(True, timeout)
```
*Some details about the Broken Pipe error*
As observed from the logs, the exception is related to [this line](https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L720), since the exception is a BrokenPipeError instead of an OSError, the connection must have been closed from the other end, i.e. [here](https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L716).
Since it’s closed from the other end, there must be some error in [this](https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L705) try/except block, searching through the logs does reveal an error in the thread spawned for uploading whatever data is sent through the pipe.
Error as follow:-
```
"Error in _start_upload while inserting file gs://hc-resolution-temp/bq_load_staging/055471a8-bef6-4afb-a850-3a5f9edc43f6/huq-core.enriched_impression_1.impressions/1901c43b-ecd0-49f7-a03a-6d6aa418d36d:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py",
line 649, in _start_upload
self._client.objects.Insert(self._insert_request, upload=self._upload)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
line 1154, in Insert
upload=upload, upload_config=upload_config)
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py",
line 715, in _RunMethod
http_request, client=self.client)
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py",
line 908, in InitializeUpload
return self.StreamInChunks()
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py",
line 1020, in StreamInChunks
additional_headers=additional_headers)
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py",
line 971, in __StreamMedia
self.RefreshResumableUploadState()
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py",
line 873, in RefreshResumableUploadState
self.stream.seek(self.progress)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py",
line 302, in seek
(offset, whence, self.position, self.last_block_position))
NotImplementedError:
offset: 169869312, whence: 0, position: 176160768, last: 167772160
```
Specifically [this error](https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/filesystemio.py#L300), when seeking an offset between the last block position and current offset.
My thoughts:
* Why is the pipeline stalled at the `StartBundle` stage after such a Broken Pipe Error? Something to do with the uploader thread?
* Could seeking an offset between the last block position and current offset be implemented without significant repercussions?
Same issue but with picture attached [here](https://docs.google.com/document/d/13ccydJSpfdUU_czHg5NeP2_LPJh8E2NZknShVAemDYs/edit?usp=sharing)
Imported from Jira [BEAM-12843](https://issues.apache.org/jira/browse/BEAM-12843). Original Jira may contain additional context.
Reported by: ryantam626.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] kennknowles commented on issue #21263: (Broken Pipe induced) Bricked Dataflow Pipeline
Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #21263:
URL: https://github.com/apache/beam/issues/21263#issuecomment-1246017170
Since this hasn't got an update or a "me too" in a long time, I am going to close it on the assumption that it was resolved or transient. It was seemingly specific to a particular customer/pipeline so most effectively debugged via user support unless we had a repro. Feel free to reopen if it happens more, or if you have more information or a reproduction.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] kennknowles closed issue #21263: (Broken Pipe induced) Bricked Dataflow Pipeline
Posted by GitBox <gi...@apache.org>.
kennknowles closed issue #21263: (Broken Pipe induced) Bricked Dataflow Pipeline
URL: https://github.com/apache/beam/issues/21263
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ewianda commented on issue #21263: (Broken Pipe induced) Bricked Dataflow Pipeline
Posted by GitBox <gi...@apache.org>.
ewianda commented on issue #21263:
URL: https://github.com/apache/beam/issues/21263#issuecomment-1354920391
me too
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org