You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Gorjan Todorovski <go...@gmail.com> on 2022/05/31 11:54:18 UTC

Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

Hi,

I am running a TensorFlow Extended (TFX) pipeline which uses Apache Beam
for data processing which in turn has a Flink Runner (Basically a batch job
on a Flink Session Cluster on Kubernetes) version 1.13.6, but the job (for
gathering stats) gets stuck.

There is nothing significant in the Job Manager or Task Manager logs. The
only thing that possibly might tell why the task is stuck seems to be a
thread dump:

"MapPartition (MapPartition at [14]{TFXIORead[train],
GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on
java.util.concurrent.CompletableFuture$Signaller@6f078632
    at sun.misc.Unsafe.park(Native Method)
    - waiting on java.util.concurrent.CompletableFuture$Signaller@6f078632
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.CompletableFuture$Signaller.block(
CompletableFuture.java:1707)
    at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323
)
    at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture
.java:1742)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:
1908)
    at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
    at org.apache.beam.runners.fnexecution.control.
SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:
504)
    ...
I use 32 parallel degrees. Task managers are set, so each TM runs in one
container with 1 CPU and a total process memory set to 20 GB. Each TM runs
1 tasks slot.

This is failing with ~100 files with a total size of about 100 GB. If I run
the pipeline with a smaller number of files to process, it runs ok.

I need Flink to be able to process different amounts of data as it is able
to scale by automatically adding pods depending on the parallel degree
setting for the specific job (I set the parallel degree to the max(number
of files,32))

Thanks,
Gorjan

Re: Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Gorjan,

sorry for the delay. What is the input of the Pipeline? Does the job 
stop reading the source or stop processing them? Can you verify if the 
TF code is not only busy doing computation?

  Jan

On 6/3/22 11:05, Gorjan Todorovski wrote:
> Hi Jan,
>
> This is a batch job so no windows. It is basically a job launched by a 
> TFX component, so I don't have control over Beam code being executed.
> I conclude that the job is stuck, since the number of bytes and 
> processed rows do not move for a long time on a specific task and 
> subtask (always the same one).
>
> Thanks,
> Gorjan
>
>
> On Thu, Jun 2, 2022 at 4:45 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>     -user@flink <ht...@flink.apache.org> as this looks like
>     purely beam issue
>
>     Could you please elaborate more about what "stuck" means? Does the
>     watermark stop progressing? Does that happen at any specific
>     instant (e.g. end of window or end of window + allowed lateness)?
>
>     On 6/1/22 15:43, Gorjan Todorovski wrote:
>>     Hi Jan,
>>
>>     I have not checked the harness log. I have now checked it *Apache
>>     Beam worker log) and found this, but currently not sure what it
>>     means:
>>
>>     2022/06/01 13:34:40 Python exited: <nil>
>>     2022/06/01 13:34:41 Python exited: <nil>
>>     Exception in thread read_grpc_client_inputs:
>>     Traceback (most recent call last):
>>       File "/usr/local/lib/python3.7/threading.py", line 926, in
>>     _bootstrap_inner
>>         self.run()
>>       File "/usr/local/lib/python3.7/threading.py", line 870, in run
>>         self._target(*self._args, **self._kwargs)
>>       File
>>     "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
>>     line 587, in <lambda>
>>         target=lambda: self._read_inputs(elements_iterator),
>>       File
>>     "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
>>     line 570, in _read_inputs
>>         for elements in elements_iterator:
>>       File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py",
>>     line 416, in __next__
>>         return self._next()
>>       File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py",
>>     line 803, in _next
>>         raise self
>>     grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous
>>     of RPC that terminated with:
>>     status = StatusCode.CANCELLED
>>     details = "Multiplexer hanging up"
>>     debug_error_string =
>>     "{"created":"@1654090485.252525992","description":"Error received
>>     from peer ipv4:127.0.0.1:44439
>>     <http://127.0.0.1:44439>","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Multiplexer
>>     hanging up","grpc_status":1}"
>>     >
>>
>>     2022/06/01 13:34:45 Python exited: <nil>
>>     2022/06/01 13:34:46 Python exited: <nil>
>>     2022/06/01 13:34:46 Python exited: <nil>
>>     2022/06/01 13:34:47 Python exited: <nil>
>>     Starting worker with command ['/opt/apache/beam/boot',
>>     '--id=3-1', '--logging_endpoint=localhost:44267',
>>     '--artifact_endpoint=localhost:36413',
>>     '--provision_endpoint=localhost:42179',
>>     '--control_endpoint=localhost:38825']
>>     Starting worker with command ['/opt/apache/beam/boot',
>>     '--id=3-3', '--logging_endpoint=localhost:38683',
>>     '--artifact_endpoint=localhost:44867',
>>     '--provision_endpoint=localhost:34833',
>>     '--control_endpoint=localhost:44351']
>>     Starting worker with command ['/opt/apache/beam/boot',
>>     '--id=3-2', '--logging_endpoint=localhost:35391',
>>     '--artifact_endpoint=localhost:46571',
>>     '--provision_endpoint=localhost:44073',
>>     '--control_endpoint=localhost:44133']
>>     Starting work...
>>
>>     On Wed, Jun 1, 2022 at 11:21 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>         Hi Gorjan,
>>
>>         +user@beam <ma...@beam.apache.org>
>>
>>         The trace you posted is just waiting for a bundle to finish
>>         in the SDK harness. I would suspect there is a problem in the
>>         logs of the harness. Did you look for possible errors there?
>>
>>          Jan
>>
>>         On 5/31/22 13:54, Gorjan Todorovski wrote:
>>>         Hi,
>>>
>>>         I am running a TensorFlow Extended (TFX) pipeline which uses
>>>         Apache Beam for data processing which in turn has a Flink
>>>         Runner (Basically a batch job on a Flink Session Cluster on
>>>         Kubernetes) version 1.13.6, but the job (for gathering
>>>         stats) gets stuck.
>>>
>>>         There is nothing significant in the Job Manager or Task
>>>         Manager logs. The only thing that possibly might tell why
>>>         the task is stuck seems to be a thread dump:
>>>
>>>         "MapPartition (MapPartition at [14]{TFXIORead[train],
>>>         GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on
>>>         java.util.concurrent.CompletableFuture$Signaller@6f078632
>>>             at sun.misc.Unsafe.park(Native Method)
>>>             - waiting on
>>>         java.util.concurrent.CompletableFuture$Signaller@6f078632
>>>             at
>>>         java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>>             at
>>>         java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>>>             at
>>>         java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>>>             at
>>>         java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>>>             at
>>>         java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>>>             at
>>>         org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>>>             at
>>>         org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
>>>             ...
>>>         I use 32 parallel degrees. Task managers are set, so each TM
>>>         runs in one container with 1 CPU and a total process memory
>>>         set to 20 GB. Each TM runs 1 tasksslot.
>>>         This is failing with ~100 files with a total size of about
>>>         100 GB. If I run the pipeline with a smaller number of files
>>>         to process, it runs ok.
>>>         I need Flink to be able to process different amounts of data
>>>         as it is able to scale by automatically adding pods
>>>         depending on the parallel degree setting for the specific
>>>         job (I set the parallel degree to the max(number of files,32))
>>>         Thanks,
>>>         Gorjan
>>

Re: Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

Posted by Gorjan Todorovski <go...@gmail.com>.
Hi Jan,

This is a batch job so no windows. It is basically a job launched by a TFX
component, so I don't have control over Beam code being executed.
I conclude that the job is stuck, since the number of bytes and processed
rows do not move for a long time on a specific task and subtask (always the
same one).

Thanks,
Gorjan


On Thu, Jun 2, 2022 at 4:45 PM Jan Lukavský <je...@seznam.cz> wrote:

> -user@flink <ht...@flink.apache.org> as this looks like purely beam
> issue
>
> Could you please elaborate more about what "stuck" means? Does the
> watermark stop progressing? Does that happen at any specific instant (e.g.
> end of window or end of window + allowed lateness)?
> On 6/1/22 15:43, Gorjan Todorovski wrote:
>
> Hi Jan,
>
> I have not checked the harness log. I have now checked it *Apache Beam
> worker log) and found this, but currently not sure what it means:
>
> 2022/06/01 13:34:40 Python exited: <nil>
> 2022/06/01 13:34:41 Python exited: <nil>
> Exception in thread read_grpc_client_inputs:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.7/threading.py", line 926, in
> _bootstrap_inner
>     self.run()
>   File "/usr/local/lib/python3.7/threading.py", line 870, in run
>     self._target(*self._args, **self._kwargs)
>   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
> line 587, in <lambda>
>     target=lambda: self._read_inputs(elements_iterator),
>   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
> line 570, in _read_inputs
>     for elements in elements_iterator:
>   File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line
> 416, in __next__
>     return self._next()
>   File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line
> 803, in _next
>     raise self
> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
> that terminated with:
> status = StatusCode.CANCELLED
> details = "Multiplexer hanging up"
> debug_error_string =
> "{"created":"@1654090485.252525992","description":"Error received from peer
> ipv4:127.0.0.1:44439","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Multiplexer
> hanging up","grpc_status":1}"
> >
>
> 2022/06/01 13:34:45 Python exited: <nil>
> 2022/06/01 13:34:46 Python exited: <nil>
> 2022/06/01 13:34:46 Python exited: <nil>
> 2022/06/01 13:34:47 Python exited: <nil>
> Starting worker with command ['/opt/apache/beam/boot', '--id=3-1',
> '--logging_endpoint=localhost:44267',
> '--artifact_endpoint=localhost:36413',
> '--provision_endpoint=localhost:42179',
> '--control_endpoint=localhost:38825']
> Starting worker with command ['/opt/apache/beam/boot', '--id=3-3',
> '--logging_endpoint=localhost:38683',
> '--artifact_endpoint=localhost:44867',
> '--provision_endpoint=localhost:34833',
> '--control_endpoint=localhost:44351']
> Starting worker with command ['/opt/apache/beam/boot', '--id=3-2',
> '--logging_endpoint=localhost:35391',
> '--artifact_endpoint=localhost:46571',
> '--provision_endpoint=localhost:44073',
> '--control_endpoint=localhost:44133']
> Starting work...
>
> On Wed, Jun 1, 2022 at 11:21 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Gorjan,
>>
>> +user@beam <us...@beam.apache.org>
>>
>> The trace you posted is just waiting for a bundle to finish in the SDK
>> harness. I would suspect there is a problem in the logs of the harness. Did
>> you look for possible errors there?
>>
>>  Jan
>> On 5/31/22 13:54, Gorjan Todorovski wrote:
>>
>> Hi,
>>
>> I am running a TensorFlow Extended (TFX) pipeline which uses Apache Beam
>> for data processing which in turn has a Flink Runner (Basically a batch job
>> on a Flink Session Cluster on Kubernetes) version 1.13.6, but the job (for
>> gathering stats) gets stuck.
>>
>> There is nothing significant in the Job Manager or Task Manager logs. The
>> only thing that possibly might tell why the task is stuck seems to be a
>> thread dump:
>>
>> "MapPartition (MapPartition at [14]{TFXIORead[train],
>> GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on
>> java.util.concurrent.CompletableFuture$Signaller@6f078632
>>     at sun.misc.Unsafe.park(Native Method)
>>     - waiting on java.util.concurrent.CompletableFuture$Signaller@
>> 6f078632
>>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>     at java.util.concurrent.CompletableFuture$Signaller.block(
>> CompletableFuture.java:1707)
>>     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:
>> 3323)
>>     at java.util.concurrent.CompletableFuture.waitingGet(
>> CompletableFuture.java:1742)
>>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:
>> 1908)
>>     at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>>     at org.apache.beam.runners.fnexecution.control.
>> SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient
>> .java:504)
>>     ...
>> I use 32 parallel degrees. Task managers are set, so each TM runs in one
>> container with 1 CPU and a total process memory set to 20 GB. Each TM
>> runs 1 tasks slot.
>> This is failing with ~100 files with a total size of about 100 GB. If I
>> run the pipeline with a smaller number of files to process, it runs ok.
>> I need Flink to be able to process different amounts of data as it is
>> able to scale by automatically adding pods depending on the parallel degree
>> setting for the specific job (I set the parallel degree to the max(number
>> of files,32))
>> Thanks,
>> Gorjan
>>
>>

Re: Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

Posted by Jan Lukavský <je...@seznam.cz>.
-user@flink <us...@flink.apache.org> as this looks like purely beam issue

Could you please elaborate more about what "stuck" means? Does the 
watermark stop progressing? Does that happen at any specific instant 
(e.g. end of window or end of window + allowed lateness)?

On 6/1/22 15:43, Gorjan Todorovski wrote:
> Hi Jan,
>
> I have not checked the harness log. I have now checked it *Apache Beam 
> worker log) and found this, but currently not sure what it means:
>
> 2022/06/01 13:34:40 Python exited: <nil>
> 2022/06/01 13:34:41 Python exited: <nil>
> Exception in thread read_grpc_client_inputs:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.7/threading.py", line 926, in 
> _bootstrap_inner
>     self.run()
>   File "/usr/local/lib/python3.7/threading.py", line 870, in run
>     self._target(*self._args, **self._kwargs)
>   File 
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", 
> line 587, in <lambda>
>     target=lambda: self._read_inputs(elements_iterator),
>   File 
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", 
> line 570, in _read_inputs
>     for elements in elements_iterator:
>   File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 
> 416, in __next__
>     return self._next()
>   File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 
> 803, in _next
>     raise self
> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of 
> RPC that terminated with:
> status = StatusCode.CANCELLED
> details = "Multiplexer hanging up"
> debug_error_string = 
> "{"created":"@1654090485.252525992","description":"Error received from 
> peer ipv4:127.0.0.1:44439 
> <http://127.0.0.1:44439>","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Multiplexer 
> hanging up","grpc_status":1}"
> >
>
> 2022/06/01 13:34:45 Python exited: <nil>
> 2022/06/01 13:34:46 Python exited: <nil>
> 2022/06/01 13:34:46 Python exited: <nil>
> 2022/06/01 13:34:47 Python exited: <nil>
> Starting worker with command ['/opt/apache/beam/boot', '--id=3-1', 
> '--logging_endpoint=localhost:44267', 
> '--artifact_endpoint=localhost:36413', 
> '--provision_endpoint=localhost:42179', 
> '--control_endpoint=localhost:38825']
> Starting worker with command ['/opt/apache/beam/boot', '--id=3-3', 
> '--logging_endpoint=localhost:38683', 
> '--artifact_endpoint=localhost:44867', 
> '--provision_endpoint=localhost:34833', 
> '--control_endpoint=localhost:44351']
> Starting worker with command ['/opt/apache/beam/boot', '--id=3-2', 
> '--logging_endpoint=localhost:35391', 
> '--artifact_endpoint=localhost:46571', 
> '--provision_endpoint=localhost:44073', 
> '--control_endpoint=localhost:44133']
> Starting work...
>
> On Wed, Jun 1, 2022 at 11:21 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>     Hi Gorjan,
>
>     +user@beam <ma...@beam.apache.org>
>
>     The trace you posted is just waiting for a bundle to finish in the
>     SDK harness. I would suspect there is a problem in the logs of the
>     harness. Did you look for possible errors there?
>
>      Jan
>
>     On 5/31/22 13:54, Gorjan Todorovski wrote:
>>     Hi,
>>
>>     I am running a TensorFlow Extended (TFX) pipeline which uses
>>     Apache Beam for data processing which in turn has a Flink Runner
>>     (Basically a batch job on a Flink Session Cluster on Kubernetes)
>>     version 1.13.6, but the job (for gathering stats) gets stuck.
>>
>>     There is nothing significant in the Job Manager or Task Manager
>>     logs. The only thing that possibly might tell why the task is
>>     stuck seems to be a thread dump:
>>
>>     "MapPartition (MapPartition at [14]{TFXIORead[train],
>>     GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on
>>     java.util.concurrent.CompletableFuture$Signaller@6f078632
>>         at sun.misc.Unsafe.park(Native Method)
>>         - waiting on
>>     java.util.concurrent.CompletableFuture$Signaller@6f078632
>>         at
>>     java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>         at
>>     java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>>         at
>>     java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>>         at
>>     java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>>         at
>>     java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>>         at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>>         at
>>     org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
>>         ...
>>     I use 32 parallel degrees. Task managers are set, so each TM runs
>>     in one container with 1 CPU and a total process memory set to 20
>>     GB. Each TM runs 1 tasksslot.
>>     This is failing with ~100 files with a total size of about 100
>>     GB. If I run the pipeline with a smaller number of files to
>>     process, it runs ok.
>>     I need Flink to be able to process different amounts of data as
>>     it is able to scale by automatically adding pods depending on the
>>     parallel degree setting for the specific job (I set the parallel
>>     degree to the max(number of files,32))
>>     Thanks,
>>     Gorjan
>

Re: Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

Posted by Jan Lukavský <je...@seznam.cz>.
-user@flink <us...@flink.apache.org> as this looks like purely beam issue

Could you please elaborate more about what "stuck" means? Does the 
watermark stop progressing? Does that happen at any specific instant 
(e.g. end of window or end of window + allowed lateness)?

On 6/1/22 15:43, Gorjan Todorovski wrote:
> Hi Jan,
>
> I have not checked the harness log. I have now checked it *Apache Beam 
> worker log) and found this, but currently not sure what it means:
>
> 2022/06/01 13:34:40 Python exited: <nil>
> 2022/06/01 13:34:41 Python exited: <nil>
> Exception in thread read_grpc_client_inputs:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.7/threading.py", line 926, in 
> _bootstrap_inner
>     self.run()
>   File "/usr/local/lib/python3.7/threading.py", line 870, in run
>     self._target(*self._args, **self._kwargs)
>   File 
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", 
> line 587, in <lambda>
>     target=lambda: self._read_inputs(elements_iterator),
>   File 
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", 
> line 570, in _read_inputs
>     for elements in elements_iterator:
>   File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 
> 416, in __next__
>     return self._next()
>   File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 
> 803, in _next
>     raise self
> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of 
> RPC that terminated with:
> status = StatusCode.CANCELLED
> details = "Multiplexer hanging up"
> debug_error_string = 
> "{"created":"@1654090485.252525992","description":"Error received from 
> peer ipv4:127.0.0.1:44439 
> <http://127.0.0.1:44439>","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Multiplexer 
> hanging up","grpc_status":1}"
> >
>
> 2022/06/01 13:34:45 Python exited: <nil>
> 2022/06/01 13:34:46 Python exited: <nil>
> 2022/06/01 13:34:46 Python exited: <nil>
> 2022/06/01 13:34:47 Python exited: <nil>
> Starting worker with command ['/opt/apache/beam/boot', '--id=3-1', 
> '--logging_endpoint=localhost:44267', 
> '--artifact_endpoint=localhost:36413', 
> '--provision_endpoint=localhost:42179', 
> '--control_endpoint=localhost:38825']
> Starting worker with command ['/opt/apache/beam/boot', '--id=3-3', 
> '--logging_endpoint=localhost:38683', 
> '--artifact_endpoint=localhost:44867', 
> '--provision_endpoint=localhost:34833', 
> '--control_endpoint=localhost:44351']
> Starting worker with command ['/opt/apache/beam/boot', '--id=3-2', 
> '--logging_endpoint=localhost:35391', 
> '--artifact_endpoint=localhost:46571', 
> '--provision_endpoint=localhost:44073', 
> '--control_endpoint=localhost:44133']
> Starting work...
>
> On Wed, Jun 1, 2022 at 11:21 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>     Hi Gorjan,
>
>     +user@beam <ma...@beam.apache.org>
>
>     The trace you posted is just waiting for a bundle to finish in the
>     SDK harness. I would suspect there is a problem in the logs of the
>     harness. Did you look for possible errors there?
>
>      Jan
>
>     On 5/31/22 13:54, Gorjan Todorovski wrote:
>>     Hi,
>>
>>     I am running a TensorFlow Extended (TFX) pipeline which uses
>>     Apache Beam for data processing which in turn has a Flink Runner
>>     (Basically a batch job on a Flink Session Cluster on Kubernetes)
>>     version 1.13.6, but the job (for gathering stats) gets stuck.
>>
>>     There is nothing significant in the Job Manager or Task Manager
>>     logs. The only thing that possibly might tell why the task is
>>     stuck seems to be a thread dump:
>>
>>     "MapPartition (MapPartition at [14]{TFXIORead[train],
>>     GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on
>>     java.util.concurrent.CompletableFuture$Signaller@6f078632
>>         at sun.misc.Unsafe.park(Native Method)
>>         - waiting on
>>     java.util.concurrent.CompletableFuture$Signaller@6f078632
>>         at
>>     java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>         at
>>     java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>>         at
>>     java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>>         at
>>     java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>>         at
>>     java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>>         at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>>         at
>>     org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
>>         ...
>>     I use 32 parallel degrees. Task managers are set, so each TM runs
>>     in one container with 1 CPU and a total process memory set to 20
>>     GB. Each TM runs 1 tasksslot.
>>     This is failing with ~100 files with a total size of about 100
>>     GB. If I run the pipeline with a smaller number of files to
>>     process, it runs ok.
>>     I need Flink to be able to process different amounts of data as
>>     it is able to scale by automatically adding pods depending on the
>>     parallel degree setting for the specific job (I set the parallel
>>     degree to the max(number of files,32))
>>     Thanks,
>>     Gorjan
>

Re: Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

Posted by Gorjan Todorovski <go...@gmail.com>.
Hi Jan,

I have not checked the harness log. I have now checked it *Apache Beam
worker log) and found this, but currently not sure what it means:

2022/06/01 13:34:40 Python exited: <nil>
2022/06/01 13:34:41 Python exited: <nil>
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/threading.py", line 926, in
_bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
line 587, in <lambda>
    target=lambda: self._read_inputs(elements_iterator),
  File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
line 570, in _read_inputs
    for elements in elements_iterator:
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 416,
in __next__
    return self._next()
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 803,
in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
that terminated with:
status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string =
"{"created":"@1654090485.252525992","description":"Error received from peer
ipv4:127.0.0.1:44439","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Multiplexer
hanging up","grpc_status":1}"
>

2022/06/01 13:34:45 Python exited: <nil>
2022/06/01 13:34:46 Python exited: <nil>
2022/06/01 13:34:46 Python exited: <nil>
2022/06/01 13:34:47 Python exited: <nil>
Starting worker with command ['/opt/apache/beam/boot', '--id=3-1',
'--logging_endpoint=localhost:44267',
'--artifact_endpoint=localhost:36413',
'--provision_endpoint=localhost:42179',
'--control_endpoint=localhost:38825']
Starting worker with command ['/opt/apache/beam/boot', '--id=3-3',
'--logging_endpoint=localhost:38683',
'--artifact_endpoint=localhost:44867',
'--provision_endpoint=localhost:34833',
'--control_endpoint=localhost:44351']
Starting worker with command ['/opt/apache/beam/boot', '--id=3-2',
'--logging_endpoint=localhost:35391',
'--artifact_endpoint=localhost:46571',
'--provision_endpoint=localhost:44073',
'--control_endpoint=localhost:44133']
Starting work...

On Wed, Jun 1, 2022 at 11:21 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Gorjan,
>
> +user@beam <us...@beam.apache.org>
>
> The trace you posted is just waiting for a bundle to finish in the SDK
> harness. I would suspect there is a problem in the logs of the harness. Did
> you look for possible errors there?
>
>  Jan
> On 5/31/22 13:54, Gorjan Todorovski wrote:
>
> Hi,
>
> I am running a TensorFlow Extended (TFX) pipeline which uses Apache Beam
> for data processing which in turn has a Flink Runner (Basically a batch job
> on a Flink Session Cluster on Kubernetes) version 1.13.6, but the job (for
> gathering stats) gets stuck.
>
> There is nothing significant in the Job Manager or Task Manager logs. The
> only thing that possibly might tell why the task is stuck seems to be a
> thread dump:
>
> "MapPartition (MapPartition at [14]{TFXIORead[train],
> GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on
> java.util.concurrent.CompletableFuture$Signaller@6f078632
>     at sun.misc.Unsafe.park(Native Method)
>     - waiting on java.util.concurrent.CompletableFuture$Signaller@6f078632
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at java.util.concurrent.CompletableFuture$Signaller.block(
> CompletableFuture.java:1707)
>     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:
> 3323)
>     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture
> .java:1742)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:
> 1908)
>     at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>     at org.apache.beam.runners.fnexecution.control.
> SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:
> 504)
>     ...
> I use 32 parallel degrees. Task managers are set, so each TM runs in one
> container with 1 CPU and a total process memory set to 20 GB. Each TM
> runs 1 tasks slot.
> This is failing with ~100 files with a total size of about 100 GB. If I
> run the pipeline with a smaller number of files to process, it runs ok.
> I need Flink to be able to process different amounts of data as it is able
> to scale by automatically adding pods depending on the parallel degree
> setting for the specific job (I set the parallel degree to the max(number
> of files,32))
> Thanks,
> Gorjan
>
>

Re: Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Gorjan,

+user@beam <ma...@beam.apache.org>

The trace you posted is just waiting for a bundle to finish in the SDK 
harness. I would suspect there is a problem in the logs of the harness. 
Did you look for possible errors there?

  Jan

On 5/31/22 13:54, Gorjan Todorovski wrote:
> Hi,
>
> I am running a TensorFlow Extended (TFX) pipeline which uses Apache 
> Beam for data processing which in turn has a Flink Runner (Basically a 
> batch job on a Flink Session Cluster on Kubernetes) version 1.13.6, 
> but the job (for gathering stats) gets stuck.
>
> There is nothing significant in the Job Manager or Task Manager logs. 
> The only thing that possibly might tell why the task is stuck seems to 
> be a thread dump:
>
> "MapPartition (MapPartition at [14]{TFXIORead[train], 
> GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on 
> java.util.concurrent.CompletableFuture$Signaller@6f078632
>     at sun.misc.Unsafe.park(Native Method)
>     - waiting on java.util.concurrent.CompletableFuture$Signaller@6f078632
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>     at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>     at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>     at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>     at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>     at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
>     ...
> I use 32 parallel degrees. Task managers are set, so each TM runs in 
> one container with 1 CPU and a total process memory set to 20 GB. Each 
> TM runs 1 tasksslot.
> This is failing with ~100 files with a total size of about 100 GB. If 
> I run the pipeline with a smaller number of files to process, it runs ok.
> I need Flink to be able to process different amounts of data as it is 
> able to scale by automatically adding pods depending on the parallel 
> degree setting for the specific job (I set the parallel degree to the 
> max(number of files,32))
> Thanks,
> Gorjan

Re: Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Gorjan,

+user@beam <ma...@beam.apache.org>

The trace you posted is just waiting for a bundle to finish in the SDK 
harness. I would suspect there is a problem in the logs of the harness. 
Did you look for possible errors there?

  Jan

On 5/31/22 13:54, Gorjan Todorovski wrote:
> Hi,
>
> I am running a TensorFlow Extended (TFX) pipeline which uses Apache 
> Beam for data processing which in turn has a Flink Runner (Basically a 
> batch job on a Flink Session Cluster on Kubernetes) version 1.13.6, 
> but the job (for gathering stats) gets stuck.
>
> There is nothing significant in the Job Manager or Task Manager logs. 
> The only thing that possibly might tell why the task is stuck seems to 
> be a thread dump:
>
> "MapPartition (MapPartition at [14]{TFXIORead[train], 
> GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on 
> java.util.concurrent.CompletableFuture$Signaller@6f078632
>     at sun.misc.Unsafe.park(Native Method)
>     - waiting on java.util.concurrent.CompletableFuture$Signaller@6f078632
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>     at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>     at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>     at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>     at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>     at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
>     ...
> I use 32 parallel degrees. Task managers are set, so each TM runs in 
> one container with 1 CPU and a total process memory set to 20 GB. Each 
> TM runs 1 tasksslot.
> This is failing with ~100 files with a total size of about 100 GB. If 
> I run the pipeline with a smaller number of files to process, it runs ok.
> I need Flink to be able to process different amounts of data as it is 
> able to scale by automatically adding pods depending on the parallel 
> degree setting for the specific job (I set the parallel degree to the 
> max(number of files,32))
> Thanks,
> Gorjan