You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Deepak Nagaraj <de...@primer.ai> on 2022/05/15 05:25:06 UTC

Beam Python pipeline hangs

Hi Beam users,

I'm facing a problem with a Beam Python pipeline. It is running on Flink,
reading from Kafka in an unbounded way, and I have use_deprecated_read flag
set. I then have 2 beam.Map() calls, followed by a .windowInto() and then a
write to a file system.

When I send a batch (1000) of small messages (20 bytes), I have no
problems. However, when I send a batch (1000) of large messages (1
kilobytes), the pipeline freezes after some time. The exact location
varies, however, I notice that there is always 10-12 records worth of gap
in Kafka records sent and the Python step records received. This is
remarkably consistent. Also, when I cancel the Flink job, I see a set of
stack traces on the console. [1]

A similar Java pipeline works fine, also if I have only one beam.Map()
call, it works fine. If I add a Reshuffle() to prevent fusion, it makes no
difference.

It seems like we have a problem posting messages from stage to stage within
Python. I'm wondering if there's a buffer in the Python SDK of about 10-12
KB that gets filled up and then blocks the pipeline from progress?

Thanks,
Deepak

[1] Sample Python stack trace,  this is printed automatically when I cancel
the Beam pipeline job on Flink UI

>
INFO:__main__:Stopping worker 1-1
Fatal Python error: could not acquire lock for <_io.BufferedWriter
name='<stdout>'> at interpreter shutdown, possibly due to daemon threads
Python runtime state: finalizing (tstate=0x7f8849f048e0)

Thread 0x000070000e577000 (most recent call first):
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 870 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 932 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Thread 0x000070000f57a000 (most recent call first):
  File "/Users/deepaknagaraj/dev/utils/kafka_to_fs/worker.py", line 27 in
read_kafka_record
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/transforms/core.py",
line 1639 in <lambda>
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 228 in process_encoded
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1002 in process_bundle
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 625 in process_bundle
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 587 in do_instruction
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 347 in <lambda>
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 274 in _execute
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 346 in task
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py",
line 37 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py",
line 53 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 932 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Thread 0x000070001a61e000 (most recent call first):
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 302 in wait
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 433 in acquire
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py",
line 57 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 932 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Thread 0x0000700018618000 (most recent call first):
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 1202 in invoke_excepthook
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 934 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Thread 0x0000700017615000 (most recent call first):
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 302 in wait
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line
170 in get
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
line 581 in _write_outputs
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py",
line 203 in consume_request_iterator
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 870 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 932 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Thread 0x000070001460c000 (most recent call first):
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 302 in wait
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line
170 in get
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 987 in request_iter
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py",
line 203 in consume_request_iterator
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 870 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 932 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Thread 0x000070000c571000 (most recent call first):
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 302 in wait
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 433 in acquire
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py",
line 57 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 932 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Thread 0x0000700013586000 (most recent call first):
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 302 in wait
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line
170 in get
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 233 in get_responses
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py",
line 203 in consume_request_iterator
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 870 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 932 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Thread 0x0000700011580000 (most recent call first):
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 306 in wait
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 558 in wait
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
line 214 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 932 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Current thread 0x000000010b173600 (most recent call first):
<no Python frame>
^C

Re: Beam Python pipeline hangs

Posted by Deepak Nagaraj <de...@primer.ai>.
Thanks very much for your attention to this problem!
Deepak

On Mon, May 16, 2022 at 1:42 PM Robert Bradshaw <ro...@google.com> wrote:
>
> I've filed https://issues.apache.org/jira/browse/BEAM-14476 to track this.
>
> On Mon, May 16, 2022 at 1:30 PM Deepak Nagaraj <de...@primer.ai> wrote:
> >
> > I just tried DirectRunner -- I did not see any problem with it.  It
> > does happen with Flink.
> >
> > Deepak
> >
> > On Mon, May 16, 2022 at 12:57 PM Robert Bradshaw <ro...@google.com> wrote:
> > >
> > > Is this just on Flink, or does this happen on other runners too?
> > >
> > > On Mon, May 16, 2022 at 12:39 PM Deepak Nagaraj
> > > <de...@primer.ai> wrote:
> > > >
> > > > Hi Robert,
> > > >
> > > > On Mon, May 16, 2022 at 12:33 PM Robert Bradshaw <ro...@google.com> wrote:
> > > > >
> > > > > On Mon, May 16, 2022 at 12:01 PM Deepak Nagaraj
> > > > > <de...@primer.ai> wrote:
> > > > > >
> > > > >
> > > > > I can imagine contention for an I/O lock, but I'm not sure how that
> > > > > would lead to a deadlock. But hopefully knowing that print() is
> > > > > involved should allow a more minimal reproduction of the issue.
> > > > >
> > > >
> > > > Yes, I've enclosed [1] a minimal pipeline that reproduces the problem
> > > > as well as the last set of logs. Per the logs, the problem occurs even
> > > > with a single worker thread.
> > > >
> > > > Thanks,
> > > > Deepak
> > > >
> > > > [1] Minimal Beam pipeline that stalls due to deadlock:
> > > >
> > > > def _run_pipeline(pipeline):
> > > >     def process_data(unused):
> > > >         print('a'*1000)
> > > >
> > > >     _ = (
> > > >             pipeline
> > > >             | "Create" >> beam.Create(['a']*1000)
> > > >             | "Process" >> beam.Map(process_data)
> > > >     )
> > > >     pipeline.run().wait_until_finish()
> > > >
> > > > [2] Last set of logs from the Python worker pool:
> > > >
> > > > DEBUG:apache_beam.runners.worker.sdk_worker:Got work 90
> > > > DEBUG:apache_beam.runners.worker.sdk_worker:Currently using 1 threads.
> > > > DEBUG:apache_beam.runners.worker.bundle_processor:start <DoOperation
> > > > Process output_tags=['None'], receivers=[ConsumerSet[Process.out0,
> > > > coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
> > > > DEBUG:apache_beam.runners.worker.bundle_processor:start <DoOperation
> > > > Create/Map(decode) output_tags=['None'],
> > > > receivers=[SingletonConsumerSet[Create/Map(decode).out0,
> > > > coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]>
> > > > DEBUG:apache_beam.runners.worker.bundle_processor:start
> > > > <DataInputOperation fn/read/ref_PCollection_PCollection_7:0
> > > > receivers=[SingletonConsumerSet[fn/read/ref_PCollection_PCollection_7:0.out0,
> > > > coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>

Re: Beam Python pipeline hangs

Posted by Robert Bradshaw <ro...@google.com>.
I've filed https://issues.apache.org/jira/browse/BEAM-14476 to track this.

On Mon, May 16, 2022 at 1:30 PM Deepak Nagaraj <de...@primer.ai> wrote:
>
> I just tried DirectRunner -- I did not see any problem with it.  It
> does happen with Flink.
>
> Deepak
>
> On Mon, May 16, 2022 at 12:57 PM Robert Bradshaw <ro...@google.com> wrote:
> >
> > Is this just on Flink, or does this happen on other runners too?
> >
> > On Mon, May 16, 2022 at 12:39 PM Deepak Nagaraj
> > <de...@primer.ai> wrote:
> > >
> > > Hi Robert,
> > >
> > > On Mon, May 16, 2022 at 12:33 PM Robert Bradshaw <ro...@google.com> wrote:
> > > >
> > > > On Mon, May 16, 2022 at 12:01 PM Deepak Nagaraj
> > > > <de...@primer.ai> wrote:
> > > > >
> > > >
> > > > I can imagine contention for an I/O lock, but I'm not sure how that
> > > > would lead to a deadlock. But hopefully knowing that print() is
> > > > involved should allow a more minimal reproduction of the issue.
> > > >
> > >
> > > Yes, I've enclosed [1] a minimal pipeline that reproduces the problem
> > > as well as the last set of logs. Per the logs, the problem occurs even
> > > with a single worker thread.
> > >
> > > Thanks,
> > > Deepak
> > >
> > > [1] Minimal Beam pipeline that stalls due to deadlock:
> > >
> > > def _run_pipeline(pipeline):
> > >     def process_data(unused):
> > >         print('a'*1000)
> > >
> > >     _ = (
> > >             pipeline
> > >             | "Create" >> beam.Create(['a']*1000)
> > >             | "Process" >> beam.Map(process_data)
> > >     )
> > >     pipeline.run().wait_until_finish()
> > >
> > > [2] Last set of logs from the Python worker pool:
> > >
> > > DEBUG:apache_beam.runners.worker.sdk_worker:Got work 90
> > > DEBUG:apache_beam.runners.worker.sdk_worker:Currently using 1 threads.
> > > DEBUG:apache_beam.runners.worker.bundle_processor:start <DoOperation
> > > Process output_tags=['None'], receivers=[ConsumerSet[Process.out0,
> > > coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
> > > DEBUG:apache_beam.runners.worker.bundle_processor:start <DoOperation
> > > Create/Map(decode) output_tags=['None'],
> > > receivers=[SingletonConsumerSet[Create/Map(decode).out0,
> > > coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]>
> > > DEBUG:apache_beam.runners.worker.bundle_processor:start
> > > <DataInputOperation fn/read/ref_PCollection_PCollection_7:0
> > > receivers=[SingletonConsumerSet[fn/read/ref_PCollection_PCollection_7:0.out0,
> > > coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>

Re: Beam Python pipeline hangs

Posted by Deepak Nagaraj <de...@primer.ai>.
I just tried DirectRunner -- I did not see any problem with it.  It
does happen with Flink.

Deepak

On Mon, May 16, 2022 at 12:57 PM Robert Bradshaw <ro...@google.com> wrote:
>
> Is this just on Flink, or does this happen on other runners too?
>
> On Mon, May 16, 2022 at 12:39 PM Deepak Nagaraj
> <de...@primer.ai> wrote:
> >
> > Hi Robert,
> >
> > On Mon, May 16, 2022 at 12:33 PM Robert Bradshaw <ro...@google.com> wrote:
> > >
> > > On Mon, May 16, 2022 at 12:01 PM Deepak Nagaraj
> > > <de...@primer.ai> wrote:
> > > >
> > >
> > > I can imagine contention for an I/O lock, but I'm not sure how that
> > > would lead to a deadlock. But hopefully knowing that print() is
> > > involved should allow a more minimal reproduction of the issue.
> > >
> >
> > Yes, I've enclosed [1] a minimal pipeline that reproduces the problem
> > as well as the last set of logs. Per the logs, the problem occurs even
> > with a single worker thread.
> >
> > Thanks,
> > Deepak
> >
> > [1] Minimal Beam pipeline that stalls due to deadlock:
> >
> > def _run_pipeline(pipeline):
> >     def process_data(unused):
> >         print('a'*1000)
> >
> >     _ = (
> >             pipeline
> >             | "Create" >> beam.Create(['a']*1000)
> >             | "Process" >> beam.Map(process_data)
> >     )
> >     pipeline.run().wait_until_finish()
> >
> > [2] Last set of logs from the Python worker pool:
> >
> > DEBUG:apache_beam.runners.worker.sdk_worker:Got work 90
> > DEBUG:apache_beam.runners.worker.sdk_worker:Currently using 1 threads.
> > DEBUG:apache_beam.runners.worker.bundle_processor:start <DoOperation
> > Process output_tags=['None'], receivers=[ConsumerSet[Process.out0,
> > coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
> > DEBUG:apache_beam.runners.worker.bundle_processor:start <DoOperation
> > Create/Map(decode) output_tags=['None'],
> > receivers=[SingletonConsumerSet[Create/Map(decode).out0,
> > coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]>
> > DEBUG:apache_beam.runners.worker.bundle_processor:start
> > <DataInputOperation fn/read/ref_PCollection_PCollection_7:0
> > receivers=[SingletonConsumerSet[fn/read/ref_PCollection_PCollection_7:0.out0,
> > coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>

Re: Beam Python pipeline hangs

Posted by Robert Bradshaw <ro...@google.com>.
Is this just on Flink, or does this happen on other runners too?

On Mon, May 16, 2022 at 12:39 PM Deepak Nagaraj
<de...@primer.ai> wrote:
>
> Hi Robert,
>
> On Mon, May 16, 2022 at 12:33 PM Robert Bradshaw <ro...@google.com> wrote:
> >
> > On Mon, May 16, 2022 at 12:01 PM Deepak Nagaraj
> > <de...@primer.ai> wrote:
> > >
> >
> > I can imagine contention for an I/O lock, but I'm not sure how that
> > would lead to a deadlock. But hopefully knowing that print() is
> > involved should allow a more minimal reproduction of the issue.
> >
>
> Yes, I've enclosed [1] a minimal pipeline that reproduces the problem
> as well as the last set of logs. Per the logs, the problem occurs even
> with a single worker thread.
>
> Thanks,
> Deepak
>
> [1] Minimal Beam pipeline that stalls due to deadlock:
>
> def _run_pipeline(pipeline):
>     def process_data(unused):
>         print('a'*1000)
>
>     _ = (
>             pipeline
>             | "Create" >> beam.Create(['a']*1000)
>             | "Process" >> beam.Map(process_data)
>     )
>     pipeline.run().wait_until_finish()
>
> [2] Last set of logs from the Python worker pool:
>
> DEBUG:apache_beam.runners.worker.sdk_worker:Got work 90
> DEBUG:apache_beam.runners.worker.sdk_worker:Currently using 1 threads.
> DEBUG:apache_beam.runners.worker.bundle_processor:start <DoOperation
> Process output_tags=['None'], receivers=[ConsumerSet[Process.out0,
> coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
> DEBUG:apache_beam.runners.worker.bundle_processor:start <DoOperation
> Create/Map(decode) output_tags=['None'],
> receivers=[SingletonConsumerSet[Create/Map(decode).out0,
> coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]>
> DEBUG:apache_beam.runners.worker.bundle_processor:start
> <DataInputOperation fn/read/ref_PCollection_PCollection_7:0
> receivers=[SingletonConsumerSet[fn/read/ref_PCollection_PCollection_7:0.out0,
> coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>

Re: Beam Python pipeline hangs

Posted by Deepak Nagaraj <de...@primer.ai>.
Hi Robert,

On Mon, May 16, 2022 at 12:33 PM Robert Bradshaw <ro...@google.com> wrote:
>
> On Mon, May 16, 2022 at 12:01 PM Deepak Nagaraj
> <de...@primer.ai> wrote:
> >
>
> I can imagine contention for an I/O lock, but I'm not sure how that
> would lead to a deadlock. But hopefully knowing that print() is
> involved should allow a more minimal reproduction of the issue.
>

Yes, I've enclosed [1] a minimal pipeline that reproduces the problem
as well as the last set of logs. Per the logs, the problem occurs even
with a single worker thread.

Thanks,
Deepak

[1] Minimal Beam pipeline that stalls due to deadlock:

def _run_pipeline(pipeline):
    def process_data(unused):
        print('a'*1000)

    _ = (
            pipeline
            | "Create" >> beam.Create(['a']*1000)
            | "Process" >> beam.Map(process_data)
    )
    pipeline.run().wait_until_finish()

[2] Last set of logs from the Python worker pool:

DEBUG:apache_beam.runners.worker.sdk_worker:Got work 90
DEBUG:apache_beam.runners.worker.sdk_worker:Currently using 1 threads.
DEBUG:apache_beam.runners.worker.bundle_processor:start <DoOperation
Process output_tags=['None'], receivers=[ConsumerSet[Process.out0,
coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
DEBUG:apache_beam.runners.worker.bundle_processor:start <DoOperation
Create/Map(decode) output_tags=['None'],
receivers=[SingletonConsumerSet[Create/Map(decode).out0,
coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]>
DEBUG:apache_beam.runners.worker.bundle_processor:start
<DataInputOperation fn/read/ref_PCollection_PCollection_7:0
receivers=[SingletonConsumerSet[fn/read/ref_PCollection_PCollection_7:0.out0,
coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>

Re: Beam Python pipeline hangs

Posted by Robert Bradshaw <ro...@google.com>.
On Mon, May 16, 2022 at 12:01 PM Deepak Nagaraj
<de...@primer.ai> wrote:
>
> Hi Robert,
>
> Thanks, a coworker and I have narrowed this down to a print(msg) statement in one of my beam.Map() functions. If we remove the print(), the pipeline does not stall. The print() itself is necessary but not sufficient: the problem occurs only when there are many large messages coming into the pipeline.

Ah, that's a nice find.

> It may be that we have a deadlock. This pipeline creates the same conditions as in Python issue42717 [1]. Especially this is relevant:
>
> ```
> In your example, the thread is very likely to be busy doing IO, so holding the io lock.
> ```
>
> As in the above ticket, the thread dump I posted previously also shows the same message about "could not acquire lock ... at interpreter shutdown, possibly due to daemon threads". A simple pure Python program can reproduce this issue. [2]
>
> So this explains the thread dump at shutdown. We still have the question of why the pipeline stalled in the first place. My theory is that the worker threads were already deadlocked. On my console, the worker logs abruptly stop when this happens, adding evidence to this hypothesis. So Python merely detected it (with a timed-wait on lock acquisition) and surfaced it at shutdown when I cancelled the pipeline job much later.
>
> As for what causes the deadlock within Beam pipeline, I'm not sure. If we imagine the print() statement in beam.Map() to be holding a lock and then also blocking, and if we have 2 such threads in Beam worker pool contending for I/O lock, can a deadlock condition occur and stall the pipeline?

I can imagine contention for an I/O lock, but I'm not sure how that
would lead to a deadlock. But hopefully knowing that print() is
involved should allow a more minimal reproduction of the issue.

> Even if we don't get to the full root cause, could we do what the Python folks did [3]: check for the condition somehow and abort when this happens? Another option would be to have a single worker pool, so that we don't run into the problem of multi-threading.
>
> To answer your question, no, the problem happens even when I write to a Kafka sink. Turns out we only need a print() statement in a Python beam.Map() call and sufficient I/O throughput to reproduce the problem.
>
> Thanks,
> Deepak
>
> [1] https://bugs.python.org/issue42717
> [2] https://bugs.python.org/file37835/dio.py
> [3] https://bugs.python.org/issue23309
>
> On Mon, May 16, 2022 at 9:41 AM Robert Bradshaw <ro...@google.com> wrote:
>>
>> This is strange, especially the part about 1 vs. 2 map calls making a
>> difference. Is the window into and write to filesystem necessary to
>> reproduce this as well?
>>
>> On Sat, May 14, 2022 at 10:25 PM Deepak Nagaraj
>> <de...@primer.ai> wrote:
>> >
>> > Hi Beam users,
>> >
>> > I'm facing a problem with a Beam Python pipeline. It is running on Flink, reading from Kafka in an unbounded way, and I have use_deprecated_read flag set. I then have 2 beam.Map() calls, followed by a .windowInto() and then a write to a file system.
>> >
>> > When I send a batch (1000) of small messages (20 bytes), I have no problems. However, when I send a batch (1000) of large messages (1 kilobytes), the pipeline freezes after some time. The exact location varies, however, I notice that there is always 10-12 records worth of gap in Kafka records sent and the Python step records received. This is remarkably consistent. Also, when I cancel the Flink job, I see a set of stack traces on the console. [1]
>> >
>> > A similar Java pipeline works fine, also if I have only one beam.Map() call, it works fine. If I add a Reshuffle() to prevent fusion, it makes no difference.
>> >
>> > It seems like we have a problem posting messages from stage to stage within Python. I'm wondering if there's a buffer in the Python SDK of about 10-12 KB that gets filled up and then blocks the pipeline from progress?
>> >
>> > Thanks,
>> > Deepak
>> >
>> > [1] Sample Python stack trace,  this is printed automatically when I cancel the Beam pipeline job on Flink UI
>> >
>> > >
>> > INFO:__main__:Stopping worker 1-1
>> > Fatal Python error: could not acquire lock for <_io.BufferedWriter name='<stdout>'> at interpreter shutdown, possibly due to daemon threads
>> > Python runtime state: finalizing (tstate=0x7f8849f048e0)
>> >
>> > Thread 0x000070000e577000 (most recent call first):
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 870 in run
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>> >
>> > Thread 0x000070000f57a000 (most recent call first):
>> >   File "/Users/deepaknagaraj/dev/utils/kafka_to_fs/worker.py", line 27 in read_kafka_record
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1639 in <lambda>
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228 in process_encoded
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1002 in process_bundle
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 625 in process_bundle
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 587 in do_instruction
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 347 in <lambda>
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 274 in _execute
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 346 in task
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 37 in run
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 53 in run
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>> >
>> > Thread 0x000070001a61e000 (most recent call first):
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 302 in wait
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 433 in acquire
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 57 in run
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>> >
>> > Thread 0x0000700018618000 (most recent call first):
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 1202 in invoke_excepthook
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 934 in _bootstrap_inner
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>> >
>> > Thread 0x0000700017615000 (most recent call first):
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 302 in wait
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line 170 in get
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py", line 581 in _write_outputs
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py", line 203 in consume_request_iterator
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 870 in run
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>> >
>> > Thread 0x000070001460c000 (most recent call first):
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 302 in wait
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line 170 in get
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 987 in request_iter
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py", line 203 in consume_request_iterator
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 870 in run
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>> >
>> > Thread 0x000070000c571000 (most recent call first):
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 302 in wait
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 433 in acquire
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 57 in run
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>> >
>> > Thread 0x0000700013586000 (most recent call first):
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 302 in wait
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line 170 in get
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 233 in get_responses
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py", line 203 in consume_request_iterator
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 870 in run
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>> >
>> > Thread 0x0000700011580000 (most recent call first):
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 306 in wait
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 558 in wait
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py", line 214 in run
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner
>> >   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>> >
>> > Current thread 0x000000010b173600 (most recent call first):
>> > <no Python frame>
>> > ^C
>> >

Re: Beam Python pipeline hangs

Posted by Deepak Nagaraj <de...@primer.ai>.
Hi Robert,

Thanks, a coworker and I have narrowed this down to a print(msg) statement
in one of my beam.Map() functions. If we remove the print(), the pipeline
does not stall. The print() itself is necessary but not sufficient: the
problem occurs only when there are many large messages coming into the
pipeline.

It may be that we have a deadlock. This pipeline creates the same
conditions as in Python issue42717 [1]. Especially this is relevant:

```
In your example, the thread is very likely to be busy doing IO, so holding
the io lock.
```

As in the above ticket, the thread dump I posted previously also shows the
same message about "could not acquire lock ... at interpreter shutdown,
possibly due to daemon threads". A simple pure Python program can reproduce
this issue. [2]

So this explains the thread dump at shutdown. We still have the question of
why the pipeline stalled in the first place. My theory is that the worker
threads were already deadlocked. On my console, the worker logs abruptly
stop when this happens, adding evidence to this hypothesis. So Python
merely detected it (with a timed-wait on lock acquisition) and surfaced it
at shutdown when I cancelled the pipeline job much later.

As for what causes the deadlock within Beam pipeline, I'm not sure. If we
imagine the print() statement in beam.Map() to be holding a lock and then
also blocking, and if we have 2 such threads in Beam worker pool contending
for I/O lock, can a deadlock condition occur and stall the pipeline?

Even if we don't get to the full root cause, could we do what the Python
folks did [3]: check for the condition somehow and abort when this happens?
Another option would be to have a single worker pool, so that we don't run
into the problem of multi-threading.

To answer your question, no, the problem happens even when I write to a
Kafka sink. Turns out we only need a print() statement in a Python
beam.Map() call and sufficient I/O throughput to reproduce the problem.

Thanks,
Deepak

[1] https://bugs.python.org/issue42717
[2] https://bugs.python.org/file37835/dio.py
[3] https://bugs.python.org/issue23309

On Mon, May 16, 2022 at 9:41 AM Robert Bradshaw <ro...@google.com> wrote:

> This is strange, especially the part about 1 vs. 2 map calls making a
> difference. Is the window into and write to filesystem necessary to
> reproduce this as well?
>
> On Sat, May 14, 2022 at 10:25 PM Deepak Nagaraj
> <de...@primer.ai> wrote:
> >
> > Hi Beam users,
> >
> > I'm facing a problem with a Beam Python pipeline. It is running on
> Flink, reading from Kafka in an unbounded way, and I have
> use_deprecated_read flag set. I then have 2 beam.Map() calls, followed by a
> .windowInto() and then a write to a file system.
> >
> > When I send a batch (1000) of small messages (20 bytes), I have no
> problems. However, when I send a batch (1000) of large messages (1
> kilobytes), the pipeline freezes after some time. The exact location
> varies, however, I notice that there is always 10-12 records worth of gap
> in Kafka records sent and the Python step records received. This is
> remarkably consistent. Also, when I cancel the Flink job, I see a set of
> stack traces on the console. [1]
> >
> > A similar Java pipeline works fine, also if I have only one beam.Map()
> call, it works fine. If I add a Reshuffle() to prevent fusion, it makes no
> difference.
> >
> > It seems like we have a problem posting messages from stage to stage
> within Python. I'm wondering if there's a buffer in the Python SDK of about
> 10-12 KB that gets filled up and then blocks the pipeline from progress?
> >
> > Thanks,
> > Deepak
> >
> > [1] Sample Python stack trace,  this is printed automatically when I
> cancel the Beam pipeline job on Flink UI
> >
> > >
> > INFO:__main__:Stopping worker 1-1
> > Fatal Python error: could not acquire lock for <_io.BufferedWriter
> name='<stdout>'> at interpreter shutdown, possibly due to daemon threads
> > Python runtime state: finalizing (tstate=0x7f8849f048e0)
> >
> > Thread 0x000070000e577000 (most recent call first):
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 870 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 932 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Thread 0x000070000f57a000 (most recent call first):
> >   File "/Users/deepaknagaraj/dev/utils/kafka_to_fs/worker.py", line 27
> in read_kafka_record
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/transforms/core.py",
> line 1639 in <lambda>
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 228 in process_encoded
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1002 in process_bundle
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 625 in process_bundle
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 587 in do_instruction
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 347 in <lambda>
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 274 in _execute
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 346 in task
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py",
> line 37 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py",
> line 53 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 932 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Thread 0x000070001a61e000 (most recent call first):
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 302 in wait
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 433 in acquire
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py",
> line 57 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 932 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Thread 0x0000700018618000 (most recent call first):
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 1202 in invoke_excepthook
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 934 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Thread 0x0000700017615000 (most recent call first):
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 302 in wait
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line
> 170 in get
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
> line 581 in _write_outputs
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py",
> line 203 in consume_request_iterator
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 870 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 932 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Thread 0x000070001460c000 (most recent call first):
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 302 in wait
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line
> 170 in get
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 987 in request_iter
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py",
> line 203 in consume_request_iterator
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 870 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 932 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Thread 0x000070000c571000 (most recent call first):
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 302 in wait
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 433 in acquire
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py",
> line 57 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 932 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Thread 0x0000700013586000 (most recent call first):
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 302 in wait
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line
> 170 in get
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 233 in get_responses
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py",
> line 203 in consume_request_iterator
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 870 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 932 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Thread 0x0000700011580000 (most recent call first):
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 306 in wait
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 558 in wait
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
> line 214 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 932 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Current thread 0x000000010b173600 (most recent call first):
> > <no Python frame>
> > ^C
> >
>

Re: Beam Python pipeline hangs

Posted by Robert Bradshaw <ro...@google.com>.
This is strange, especially the part about 1 vs. 2 map calls making a
difference. Is the window into and write to filesystem necessary to
reproduce this as well?

On Sat, May 14, 2022 at 10:25 PM Deepak Nagaraj
<de...@primer.ai> wrote:
>
> Hi Beam users,
>
> I'm facing a problem with a Beam Python pipeline. It is running on Flink, reading from Kafka in an unbounded way, and I have use_deprecated_read flag set. I then have 2 beam.Map() calls, followed by a .windowInto() and then a write to a file system.
>
> When I send a batch (1000) of small messages (20 bytes), I have no problems. However, when I send a batch (1000) of large messages (1 kilobytes), the pipeline freezes after some time. The exact location varies, however, I notice that there is always 10-12 records worth of gap in Kafka records sent and the Python step records received. This is remarkably consistent. Also, when I cancel the Flink job, I see a set of stack traces on the console. [1]
>
> A similar Java pipeline works fine, also if I have only one beam.Map() call, it works fine. If I add a Reshuffle() to prevent fusion, it makes no difference.
>
> It seems like we have a problem posting messages from stage to stage within Python. I'm wondering if there's a buffer in the Python SDK of about 10-12 KB that gets filled up and then blocks the pipeline from progress?
>
> Thanks,
> Deepak
>
> [1] Sample Python stack trace,  this is printed automatically when I cancel the Beam pipeline job on Flink UI
>
> >
> INFO:__main__:Stopping worker 1-1
> Fatal Python error: could not acquire lock for <_io.BufferedWriter name='<stdout>'> at interpreter shutdown, possibly due to daemon threads
> Python runtime state: finalizing (tstate=0x7f8849f048e0)
>
> Thread 0x000070000e577000 (most recent call first):
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 870 in run
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>
> Thread 0x000070000f57a000 (most recent call first):
>   File "/Users/deepaknagaraj/dev/utils/kafka_to_fs/worker.py", line 27 in read_kafka_record
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1639 in <lambda>
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228 in process_encoded
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1002 in process_bundle
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 625 in process_bundle
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 587 in do_instruction
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 347 in <lambda>
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 274 in _execute
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 346 in task
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 37 in run
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 53 in run
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>
> Thread 0x000070001a61e000 (most recent call first):
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 302 in wait
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 433 in acquire
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 57 in run
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>
> Thread 0x0000700018618000 (most recent call first):
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 1202 in invoke_excepthook
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 934 in _bootstrap_inner
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>
> Thread 0x0000700017615000 (most recent call first):
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 302 in wait
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line 170 in get
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py", line 581 in _write_outputs
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py", line 203 in consume_request_iterator
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 870 in run
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>
> Thread 0x000070001460c000 (most recent call first):
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 302 in wait
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line 170 in get
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 987 in request_iter
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py", line 203 in consume_request_iterator
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 870 in run
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>
> Thread 0x000070000c571000 (most recent call first):
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 302 in wait
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 433 in acquire
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 57 in run
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>
> Thread 0x0000700013586000 (most recent call first):
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 302 in wait
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line 170 in get
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 233 in get_responses
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py", line 203 in consume_request_iterator
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 870 in run
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>
> Thread 0x0000700011580000 (most recent call first):
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 306 in wait
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 558 in wait
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py", line 214 in run
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner
>   File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap
>
> Current thread 0x000000010b173600 (most recent call first):
> <no Python frame>
> ^C
>