You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Ryan Tam (Jira)" <ji...@apache.org> on 2021/09/06 11:30:00 UTC

[jira] [Updated] (BEAM-12843) (Broken Pipe induced) Bricked Dataflow Pipeline 

     [ https://issues.apache.org/jira/browse/BEAM-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ryan Tam updated BEAM-12843:
----------------------------
    Description: 
We are seeing Dataflow pipelines being stuck indefinitely, the common theme of this behaviour is a bundle failing with the Broken Pipe error and subsequently the next bundle being stuck at the `StartBundle` stage (reported by Dataflow).

Specifically, we see an exception like the following for a bundle (truncated re-raise exception log as it’s long):-

 
{code:java}
"Error processing instruction process_bundle-7079259598045896145-12555. Original traceback is
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 875, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1359, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "/usr/local/lib/python3.6/site-packages/resolution/utilities/beam.py", line 192, in process
    writer.write(element)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1454, in write
    return self._file_handle.write(self._coder.encode(row) + b'\n')
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line 200, in write
    self._uploader.put(b)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", line 661, in put
    self._conn.send_bytes(data.tobytes())
  File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 398, in _send_bytes
    self._send(buf)
  File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
{code}
And as previously mentioned,  the next bundle is stuck at the `StartBundle` stage (reported by Dataflow), the progress report thread logs message like these:-

 
{code:java}
"Operation ongoing for over 10087.60 seconds in state start-msecs in step Assign to Location for POI joins-ptransform-49654  without returning. Current Traceback:
  File "/usr/local/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()

  File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()

  File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
    self._work_item.run()

  File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))

  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in task
    lambda: self.create_worker().do_instruction(request), request)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
    response = task()

  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction
    getattr(request, request_type), request.instruction_id)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
    bundle_processor.process_bundle(instruction_id))

  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 983, in process_bundle
    expected_inputs):

  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py", line 459, in input_elements
    element = received.get(timeout=1)

  File "/usr/local/lib/python3.6/queue.py", line 173, in get
    self.not_empty.wait(remaining)

  File "/usr/local/lib/python3.6/threading.py", line 299, in wait
    gotit = waiter.acquire(True, timeout)
{code}
 

*Some details about the Broken Pipe error*

As observed from the logs, the exception is related to [this line|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L720], since the exception is a BrokenPipeError instead of an OSError, the connection must have been closed from the other end, i.e. [here|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L716].

Since it’s closed from the other end, there must be some error in [this|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L705] try/except block, searching through the logs does reveal an error in the thread spawned for uploading whatever data is sent through the pipe.

Error as follow:-

 
{code:java}
"Error in _start_upload while inserting file gs://hc-resolution-temp/bq_load_staging/055471a8-bef6-4afb-a850-3a5f9edc43f6/huq-core.enriched_impression_1.impressions/1901c43b-ecd0-49f7-a03a-6d6aa418d36d: Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", line 649, in _start_upload
    self._client.objects.Insert(self._insert_request, upload=self._upload)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1154, in Insert
    upload=upload, upload_config=upload_config)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 715, in _RunMethod
    http_request, client=self.client)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 908, in InitializeUpload
    return self.StreamInChunks()
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 1020, in StreamInChunks
    additional_headers=additional_headers)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 971, in __StreamMedia
    self.RefreshResumableUploadState()
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 873, in RefreshResumableUploadState
    self.stream.seek(self.progress)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line 302, in seek
    (offset, whence, self.position, self.last_block_position))
NotImplementedError: offset: 169869312, whence: 0, position: 176160768, last: 167772160
{code}
 

Specifically [this error|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/filesystemio.py#L300], when seeking an offset between the last block position and current offset.

 

My thoughts:
 * Why is the pipeline stalled at the `StartBundle` stage after such a Broken Pipe Error? Something to do with the uploader thread?
 * Could seeking an offset between the last block position and current offset be implemented without significant repercussions?

 

Same issue but with picture attached [here|https://docs.google.com/document/d/13ccydJSpfdUU_czHg5NeP2_LPJh8E2NZknShVAemDYs/edit?usp=sharing]

  was:
We are seeing Dataflow pipelines being stuck indefinitely, the common theme of this behaviour is a bundle failing with the Broken Pipe error and subsequently the next bundle being stuck at the `StartBundle` stage (reported by Dataflow).

Specifically, we see an exception like the following for a bundle:-

 
{code:java}
"Error processing instruction process_bundle-7079259598045896145-12555. Original traceback is
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 875, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1359, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "/usr/local/lib/python3.6/site-packages/resolution/utilities/beam.py", line 192, in process
    writer.write(element)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1454, in write
    return self._file_handle.write(self._coder.encode(row) + b'\n')
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line 200, in write
    self._uploader.put(b)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", line 661, in put
    self._conn.send_bytes(data.tobytes())
  File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 398, in _send_bytes
    self._send(buf)
  File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
{code}
And as previously mentioned,  the next bundle is stuck at the `StartBundle` stage (reported by Dataflow), the progress report thread logs message like these:-

 
{code:java}
"Operation ongoing for over 10087.60 seconds in state start-msecs in step Assign to Location for POI joins-ptransform-49654  without returning. Current Traceback:
  File "/usr/local/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()

  File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()

  File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
    self._work_item.run()

  File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))

  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in task
    lambda: self.create_worker().do_instruction(request), request)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
    response = task()

  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction
    getattr(request, request_type), request.instruction_id)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
    bundle_processor.process_bundle(instruction_id))

  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 983, in process_bundle
    expected_inputs):

  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py", line 459, in input_elements
    element = received.get(timeout=1)

  File "/usr/local/lib/python3.6/queue.py", line 173, in get
    self.not_empty.wait(remaining)

  File "/usr/local/lib/python3.6/threading.py", line 299, in wait
    gotit = waiter.acquire(True, timeout)
{code}
 

*Some details about the Broken Pipe error*

As observed from the logs, the exception is related to [this line|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L720], since the exception is a BrokenPipeError instead of an OSError, the connection must have been closed from the other end, i.e. [here|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L716].

Since it’s closed from the other end, there must be some error in [this|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L705] try/except block, searching through the logs does reveal an error in the thread spawned for uploading whatever data is sent through the pipe.

Error as follow:-

 
{code:java}
"Error in _start_upload while inserting file gs://hc-resolution-temp/bq_load_staging/055471a8-bef6-4afb-a850-3a5f9edc43f6/huq-core.enriched_impression_1.impressions/1901c43b-ecd0-49f7-a03a-6d6aa418d36d: Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", line 649, in _start_upload
    self._client.objects.Insert(self._insert_request, upload=self._upload)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1154, in Insert
    upload=upload, upload_config=upload_config)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 715, in _RunMethod
    http_request, client=self.client)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 908, in InitializeUpload
    return self.StreamInChunks()
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 1020, in StreamInChunks
    additional_headers=additional_headers)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 971, in __StreamMedia
    self.RefreshResumableUploadState()
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 873, in RefreshResumableUploadState
    self.stream.seek(self.progress)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line 302, in seek
    (offset, whence, self.position, self.last_block_position))
NotImplementedError: offset: 169869312, whence: 0, position: 176160768, last: 167772160
{code}
 

Specifically [this error|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/filesystemio.py#L300], when seeking an offset between the last block position and current offset.

 

My thoughts:
 * Why is the pipeline stalled at the `StartBundle` stage after such a Broken Pipe Error? Something to do with the uploader thread?
 * Could seeking an offset between the last block position and current offset be implemented without significant repercussions?

 

Same issue but with picture attached [here|https://docs.google.com/document/d/13ccydJSpfdUU_czHg5NeP2_LPJh8E2NZknShVAemDYs/edit?usp=sharing]


> (Broken Pipe induced) Bricked Dataflow Pipeline 
> ------------------------------------------------
>
>                 Key: BEAM-12843
>                 URL: https://issues.apache.org/jira/browse/BEAM-12843
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp, runner-dataflow
>    Affects Versions: 2.31.0
>            Reporter: Ryan Tam
>            Priority: P2
>
> We are seeing Dataflow pipelines being stuck indefinitely, the common theme of this behaviour is a bundle failing with the Broken Pipe error and subsequently the next bundle being stuck at the `StartBundle` stage (reported by Dataflow).
> Specifically, we see an exception like the following for a bundle (truncated re-raise exception log as it’s long):-
>  
> {code:java}
> "Error processing instruction process_bundle-7079259598045896145-12555. Original traceback is
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 875, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 1359, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "/usr/local/lib/python3.6/site-packages/resolution/utilities/beam.py", line 192, in process
>     writer.write(element)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1454, in write
>     return self._file_handle.write(self._coder.encode(row) + b'\n')
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line 200, in write
>     self._uploader.put(b)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", line 661, in put
>     self._conn.send_bytes(data.tobytes())
>   File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
>     self._send_bytes(m[offset:offset + size])
>   File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 398, in _send_bytes
>     self._send(buf)
>   File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 368, in _send
>     n = write(self._handle, buf)
> BrokenPipeError: [Errno 32] Broken pipe
> {code}
> And as previously mentioned,  the next bundle is stuck at the `StartBundle` stage (reported by Dataflow), the progress report thread logs message like these:-
>  
> {code:java}
> "Operation ongoing for over 10087.60 seconds in state start-msecs in step Assign to Location for POI joins-ptransform-49654  without returning. Current Traceback:
>   File "/usr/local/lib/python3.6/threading.py", line 884, in _bootstrap
>     self._bootstrap_inner()
>   File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner
>     self.run()
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
>     self._work_item.run()
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
>     self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in task
>     lambda: self.create_worker().do_instruction(request), request)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
>     response = task()
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 983, in process_bundle
>     expected_inputs):
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py", line 459, in input_elements
>     element = received.get(timeout=1)
>   File "/usr/local/lib/python3.6/queue.py", line 173, in get
>     self.not_empty.wait(remaining)
>   File "/usr/local/lib/python3.6/threading.py", line 299, in wait
>     gotit = waiter.acquire(True, timeout)
> {code}
>  
> *Some details about the Broken Pipe error*
> As observed from the logs, the exception is related to [this line|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L720], since the exception is a BrokenPipeError instead of an OSError, the connection must have been closed from the other end, i.e. [here|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L716].
> Since it’s closed from the other end, there must be some error in [this|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L705] try/except block, searching through the logs does reveal an error in the thread spawned for uploading whatever data is sent through the pipe.
> Error as follow:-
>  
> {code:java}
> "Error in _start_upload while inserting file gs://hc-resolution-temp/bq_load_staging/055471a8-bef6-4afb-a850-3a5f9edc43f6/huq-core.enriched_impression_1.impressions/1901c43b-ecd0-49f7-a03a-6d6aa418d36d: Traceback (most recent call last):
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", line 649, in _start_upload
>     self._client.objects.Insert(self._insert_request, upload=self._upload)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1154, in Insert
>     upload=upload, upload_config=upload_config)
>   File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 715, in _RunMethod
>     http_request, client=self.client)
>   File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 908, in InitializeUpload
>     return self.StreamInChunks()
>   File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 1020, in StreamInChunks
>     additional_headers=additional_headers)
>   File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 971, in __StreamMedia
>     self.RefreshResumableUploadState()
>   File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 873, in RefreshResumableUploadState
>     self.stream.seek(self.progress)
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line 302, in seek
>     (offset, whence, self.position, self.last_block_position))
> NotImplementedError: offset: 169869312, whence: 0, position: 176160768, last: 167772160
> {code}
>  
> Specifically [this error|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/filesystemio.py#L300], when seeking an offset between the last block position and current offset.
>  
> My thoughts:
>  * Why is the pipeline stalled at the `StartBundle` stage after such a Broken Pipe Error? Something to do with the uploader thread?
>  * Could seeking an offset between the last block position and current offset be implemented without significant repercussions?
>  
> Same issue but with picture attached [here|https://docs.google.com/document/d/13ccydJSpfdUU_czHg5NeP2_LPJh8E2NZknShVAemDYs/edit?usp=sharing]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)