You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Ankur Goenka (Jira)" <ji...@apache.org> on 2019/09/13 02:02:00 UTC
[jira] [Updated] (BEAM-8226) Python Streaming Pipeline getting
stuck in dataflow
[ https://issues.apache.org/jira/browse/BEAM-8226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ankur Goenka updated BEAM-8226:
-------------------------------
Fix Version/s: 2.16.0
> Python Streaming Pipeline getting stuck in dataflow
> ---------------------------------------------------
>
> Key: BEAM-8226
> URL: https://issues.apache.org/jira/browse/BEAM-8226
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-harness, sdk-py-harness
> Reporter: Ankur Goenka
> Priority: Major
> Fix For: 2.16.0
>
>
> Python streaming pipeline are getting stuck with following error when runing on dataflow
>
> Relevant thread stack
> --- Threads (4): [Thread[Thread-19,1,main], Thread[Thread-20,1,main], Thread[Thread-21,1,main], Thread[Thread-22,1,main]] State: WAITING stack: ---
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.maybeWait(RemoteGrpcPortWriteOperation.java:175)
> org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.process(RemoteGrpcPortWriteOperation.java:196)
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> For Python
> --- Thread #139819623634688 name: ThreadPoolExecutor-1_0 ---
> File "/usr/local/lib/python3.6/threading.py", line 884, in _bootstrap
> self._bootstrap_inner()
> File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner
> self.run()
> File "/usr/local/lib/python3.6/threading.py", line 864, in run
> self._target(*self._args, **self._kwargs)
> File "/usr/local/lib/python3.6/concurrent/futures/thread.py", line 69, in _worker
> work_item.run()
> File "/usr/local/lib/python3.6/concurrent/futures/thread.py", line 56, in run
> result = self.fn(*self.args, **self.kwargs)
> File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 191, in task
> self._execute(lambda: worker.do_instruction(work), work)
> File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 158, in _execute
> response = task()
> File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 191, in <lambda>
> self._execute(lambda: worker.do_instruction(work), work)
> File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 343, in do_instruction
> request.instruction_id)
> File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 369, in process_bundle
> bundle_processor.process_bundle(instruction_id))
> File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 661, in process_bundle
> instruction_id, expected_transforms):
> File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py", line 213, in input_elements
> data = received.get(timeout=1)
> File "/usr/local/lib/python3.6/queue.py", line 173, in get
> self.not_empty.wait(remaining)
> File "/usr/local/lib/python3.6/threading.py", line 299, in wait
> gotit = waiter.acquire(True, timeout)
--
This message was sent by Atlassian Jira
(v8.3.2#803003)