You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ilan Huchansky <il...@start.io> on 2021/12/02 14:50:49 UTC

Unable to create new native thread error

Hi Flink mailing list,

I am Ilan from Start.io data platform team, need some guidance.

We have a flow with the following use case:


  *   We read files from AWS S3 buckets process them on our cluster and sink the data into files using Flink file sink.
  *   The jobs use always the same jar, we uploaded it to every job manager on the cluster.
  *   We are submitting jobs constantly through the REST API.
  *   Each job reads one or more files from S3.
  *   The jobs can run from 20 seconds up to 3.5 hours.
  *   The jobs run on batch mode
  *   Running flink 1.13.1
  *   We are running in cluster mode using docker, same machines are being used for task and job manager.

 We are struggling with the same error, over and over again. We encounter it in the job manager and in the task manager.

After a while that the cluster is running and jobs are finishing correctly the task and job manager fail to operate due to:
Caused by: java.lang.OutOfMemoryError: unable to create new native thread.


We also see some sporadic failure of java.lang.NoClassDefFoundError, not sure it is related.

Our set up and configuration are as follow:
*         5 nodes cluster running on docker
*         Relevant memory config:
jobmanager.memory.heap.size: 1600m
taskmanager.memory.process.size: 231664m
taskmanager.memory.network.fraction: 0.3
taskmanager.memory.jvm-metaspace.size: 10g
jobmanager.memory.jvm-metaspace.size: 2g
taskmanager.memory.framework.off-heap.size: 1g

·         Host details
max locked memory  (kbytes, -l) 65536
max memory size       (kbytes, -m) unlimited
open files                     (-n) 1024
max user processes    (-u) 1547269
virtual memory           (kbytes, -v) unlimited
file locks                       (-x) unlimited

cat /proc/sys/kernel/threads-max: 3094538
kernel.pid_max = 57344


We try to increase the max user processes, also to increase and decrease the jvm-metaspace.

Should we keep increasing the max number of processes on the host, Is there a way to limit the number of threads from flink config?

What should we do? Any insights?
I can provide more information as needed.

Thanks in advance

 Ilan


Re: Unable to create new native thread error

Posted by David Morávek <dm...@apache.org>.
Maybe PID limit for cgroup could also come into play with kubernetes [1][2]
(*/sys/fs/cgroup/pids/pids.max*)? How many threads does your TM create
before crashing?

[1] https://kubernetes.io/docs/concepts/policy/pid-limiting/#pod-pid-limits
[2] https://access.redhat.com/discussions/4713291

On Mon, Dec 13, 2021 at 4:46 PM Ilan Huchansky <il...@start.io>
wrote:

> Hi David,
>
>
>
> We already increased the max number of threads:
>
>
> sudo cat /proc/sys/kernel/threads-max
>
> 3094538
>
>
>
> Our cluster runs over docker, the machines hosting the dockers are
> dedicated only to the cluster.
> The configuration of the docker are pulled from the hosts so same number
> of threads is configured on the task and job managers.
>
>
>
> Kind regards,
>
> Ilan
>
>
>
> *From: *David Morávek <dm...@apache.org>
> *Date: *Monday, 13 December 2021 at 16:25
> *To: *Ilan Huchansky <il...@start.io>
> *Cc: *user@flink.apache.org <us...@flink.apache.org>, Start.io SDP <
> sdp@start.io>
> *Subject: *Re: Unable to create new native thread error
>
> Hi Ilan, can you please check number of threads on the task-managers / OS?
> As far as I remember this happens when system can not create any more
> threads (there is a system wide limit */proc/sys/kernel/threads-max*
> [1]). Please not that the limit might be exhausted by other processes.
>
>
>
> [1] https://man7.org/linux/man-pages/man5/proc.5.html
>
>
>
> D.
>
>
>
> On Sun, Dec 12, 2021 at 2:53 PM Ilan Huchansky <il...@start.io>
> wrote:
>
> Hi David,
>
>
>
> Sorry for the previous mail, sent it before it was finished, please ignore.
>
>
>
> We made the changes, now submitting the jobs using flink CLI.
>
> To be more specific –
>
> We use a docker, with a flink image containing flink CLI. We submit the
> jobs with the run command specifying the job manager we want to submit to.
> We call this the submitter.
>                 As explained in previous mails, job managers and task
> managers run over docker on separated machines (each machine has 1 task
> manger and 1 job manager) and also separated from the submitter.
>
>
>
> Unfortunately, we are still seeing the same error.
>
> Caused by: java.lang.OutOfMemoryError: unable to create new native thread.
>
> The behavior is the following:
>
> One task manager crashes, from that point submitting new jobs fail with
> the following error:
>
> Caused by: java.io.IOException: Could not connect to BlobServer at address
>
> Then we see the native thread error on another task manager.
>
>
>
> The cluster is up without running jobs until we restart the task / job
> managers.
>
>
>
> Our blob related configuration:
>
>    - blob.server.port: 6124
>    - blob.fetch.num-concurrent: 300
>    - blob.fetch.retries: 20
>    - blob.service.cleanup.interval: 10800
>
>
>
> full stack trace of the submitting error:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Failed to execute job 'job_name'.\n", b'\tat
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)\n',
>
>
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)\n',
>
>
>
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)\n',
>
>
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)\n',
>
>
>
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)\n',
>
>
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)\n',
>
>
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)\n',
>
>
>
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)\n',
>
>
>
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)\n',
> b"Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> 'job_name'.\n",
>
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)\n',
>
>
>
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)\n',
>
>
>
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n',
>
>
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)\n',
>
>
>
> com.startapp.consumer.KafkaStreaming.main(KafkaStreaming.java:84)\n',
>
>                 sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)\n',
>
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n',
>
>
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n',
>
>
>                 java.lang.reflect.Method.invoke(Method.java:498)\n',
>
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)\n',
>
>
> \t... 8 more\n',
>
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.\n',
>
>
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)\n',
>
>
>
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)\n',
>
>
>
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)\n',
>
>
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
>
>
>
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)\n',
>
>
>
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:364)\n',
>
>
>
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n',
>
>
>
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n',
>
>
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
>
>
>
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)\n',
>
>
>
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)\n',
>
>
>
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n',
>
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n',
>
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n',
>
>
>                 java.lang.Thread.run(Thread.java:748)\n',
>
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not
> upload job files.\n',
>
>
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:201)\n',
>
>
>
> java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)\n',
>
>
>
> java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)\n',
>
>
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
>
>
>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n',
>
>
>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n',
>
>
> java.util.concurrent.FutureTask.run(FutureTask.java:266)\n',
>
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n',
>
>
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n',
>
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n',
>
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n',
>
>
>                 java.lang.Thread.run(Thread.java:748)\n',
>
> Caused by: org.apache.flink.util.FlinkException: Could not upload job
> files.\n',
>
>
> org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86)\n',
>
>
>
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:195)\n',
>
>
> \t... 11 more\n',
>
> Caused by: java.io.IOException: Could not connect to BlobServer at address
> DOMAIN/IP:PORT\n',
>
>
> org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:102)\n',
>
>
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:199)\n',
>
>
>
> org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:82)\n',
>
>
> \t... 12 more\n',
>
> Caused by: java.net.ConnectException: Connection refused (Connection
> refused)\n',
>
>                 java.net.PlainSocketImpl.socketConnect(Native Method)\n',
>
>
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)\n',
>
>
>
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)\n',
>
>
>
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)\n',
>
>
>
> java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n',
>
>                 java.net.Socket.connect(Socket.java:607)\n',
>
>
> org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:96)\n',
>
> \t... 14 more\n',
>
> ]\n',
>
>
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:486)\n',
>
>
>
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:466)\n',
>
>
>
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)\n',
>
>
>
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)\n',
>
>
> \t... 4 more\n']
>
>
>
> Could the blob error lead to the native thread error?
>
> Is there any changes on the configuration that we can make to avoid the
> blob related errors?
>
> Can the current submission method that we are using trigger the native
> thread error?
>
>
>
>
> Thanks,
> Ilan.
>
>
>
>
>
> *From: *Ilan Huchansky <il...@start.io>
> *Date: *Tuesday, 7 December 2021 at 11:22
> *To: *David Morávek <dm...@apache.org>
> *Cc: *user@flink.apache.org <us...@flink.apache.org>, Start.io SDP <
> sdp@start.io>
> *Subject: *Re: Unable to create new native thread error
>
> Hi David,
>
>
>
> In that case, I will start working on using the CLI instead of the REST
> API right away.
>
>
>
> Will update you when I finish.
>
>
>
> Thanks for the help,
>
> Ilan.
>
>
>
>
>
> *From: *David Morávek <dm...@apache.org>
> *Date: *Monday, 6 December 2021 at 10:34
> *To: *Ilan Huchansky <il...@start.io>
> *Cc: *user@flink.apache.org <us...@flink.apache.org>, Start.io SDP <
> sdp@start.io>
> *Subject: *Re: Unable to create new native thread error
>
> Hi Ilan,
>
>
>
> I think so, using CLI instead of REST API should solve this, as the user
> code execution would be pulled out to a separate JVM. If you're going to
> try that, it would be great to hear back whether it has solved your issue.
>
>
>
> As for 1.13.4, there is currently no on-going effort / concrete plan on
> the release.
>
>
>
> Best,
>
> D.
>
>
>
> On Sun, Dec 5, 2021 at 4:06 PM Ilan Huchansky <il...@start.io>
> wrote:
>
> Hi David,
>
>
>
> Thanks for your fast response.
>
>
>
> Do you think that changing the submission method could solve the problem?
> Using the CLI instead of the REST API.
>
>
>
> Another question, I see that the most critical issue (FLINK-25022) is in
> progress and should be released on with version 1.13.4 , do you know when
> this version is planned to be released?
>
>
>
> Thanks again,
>
> Ilan.
>
>
>
> *From: *David Morávek <dm...@apache.org>
> *Date: *Thursday, 2 December 2021 at 17:25
> *To: *Ilan Huchansky <il...@start.io>
> *Cc: *user@flink.apache.org <us...@flink.apache.org>, Start.io SDP <
> sdp@start.io>
> *Subject: *Re: Unable to create new native thread error
>
> Hi Ilan,
>
>
>
> we are aware of multiple issues when web-submission can result in
> classloader / thread local leaks, which could potentially result in the
> behavior you're describing. We're working on addressing them.
>
>
>
> FLINK-25022 [1]: The most critical one leaking thread locals.
> FLINK-25027 [2]: Is only a memory improvement for a particular situation
> (a lot of small batch jobs) and could be fixed by accounting for when
> setting Metaspace size.
> FLINK-25023 [3]: Can leak the classloader of the first job submitted via
> rest API. (constant overhead for Metaspace)
>
>
>
> In general, web-submission is different from a normal submission in way,
> that the "main method" of the uploaded jar is executed on JobManager and
> it's really hard to isolate it's execution from possible side effects.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-25022
>
> [2] https://issues.apache.org/jira/browse/FLINK-25027
>
> [3] https://issues.apache.org/jira/browse/FLINK-25023
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 3:51 PM Ilan Huchansky <il...@start.io>
> wrote:
>
> *Hi Flink mailing list,*
>
>
>
> I am Ilan from Start.io data platform team, need some guidance.
>
>
>
> We have a flow with the following use case:
>
>
>
>    - We read files from AWS S3 buckets process them on our cluster and
>    sink the data into files using Flink file sink.
>    - The jobs use always the same jar, we uploaded it to every job
>    manager on the cluster.
>    - We are submitting jobs constantly through the REST API.
>    - Each job reads one or more files from S3.
>    - The jobs can run from 20 seconds up to 3.5 hours.
>    - The jobs run on batch mode
>    - Running flink 1.13.1
>    - We are running in cluster mode using docker, same machines are being
>    used for task and job manager.
>
>
>
>  We are struggling with the same error, over and over again. We encounter
> it in the job manager and in the task manager.
>
>
>
> After a while that the cluster is running and jobs are finishing correctly
> the task and job manager fail to operate due to:
>
> Caused by: java.lang.OutOfMemoryError: unable to create new native thread.
>
>
>
>
>
> We also see some sporadic failure of java.lang.NoClassDefFoundError, not
> sure it is related.
>
>
>
> Our set up and configuration are as follow:
>
> ·         5 nodes cluster running on docker
>
> ·         Relevant memory config:
>
> jobmanager.memory.heap.size: 1600m
>
> taskmanager.memory.process.size: 231664m
>
> taskmanager.memory.network.fraction: 0.3
>
> taskmanager.memory.jvm-metaspace.size: 10g
>
> jobmanager.memory.jvm-metaspace.size: 2g
>
> taskmanager.memory.framework.off-heap.size: 1g
>
>
>
> ·         Host details
>
> max locked memory  (kbytes, -l) 65536
>
> max memory size       (kbytes, -m) unlimited
>
> open files                     (-n) 1024
>
> max user processes    (-u) 1547269
>
> virtual memory           (kbytes, -v) unlimited
>
> file locks                       (-x) unlimited
>
>
>
> cat /proc/sys/kernel/threads-max: 3094538
>
> kernel.pid_max = 57344
>
>
>
>
>
> We try to increase the max user processes, also to increase and decrease
> the jvm-metaspace.
>
>
>
> Should we keep increasing the max number of processes on the host, Is
> there a way to limit the number of threads from flink config?
>
>
>
> What should we do? Any insights?
> I can provide more information as needed.
>
>
>
> Thanks in advance
>
>
>
>  Ilan
>
>
>
>

Re: Unable to create new native thread error

Posted by Ilan Huchansky <il...@start.io>.
Hi David,

We already increased the max number of threads:

sudo cat /proc/sys/kernel/threads-max
3094538

Our cluster runs over docker, the machines hosting the dockers are dedicated only to the cluster.
The configuration of the docker are pulled from the hosts so same number of threads is configured on the task and job managers.

Kind regards,
Ilan

From: David Morávek <dm...@apache.org>
Date: Monday, 13 December 2021 at 16:25
To: Ilan Huchansky <il...@start.io>
Cc: user@flink.apache.org <us...@flink.apache.org>, Start.io SDP <sd...@start.io>
Subject: Re: Unable to create new native thread error
Hi Ilan, can you please check number of threads on the task-managers / OS? As far as I remember this happens when system can not create any more threads (there is a system wide limit /proc/sys/kernel/threads-max [1]). Please not that the limit might be exhausted by other processes.

[1] https://man7.org/linux/man-pages/man5/proc.5.html

D.

On Sun, Dec 12, 2021 at 2:53 PM Ilan Huchansky <il...@start.io>> wrote:
Hi David,

Sorry for the previous mail, sent it before it was finished, please ignore.

We made the changes, now submitting the jobs using flink CLI.
To be more specific –
We use a docker, with a flink image containing flink CLI. We submit the jobs with the run command specifying the job manager we want to submit to. We call this the submitter.
                As explained in previous mails, job managers and task managers run over docker on separated machines (each machine has 1 task manger and 1 job manager) and also separated from the submitter.

Unfortunately, we are still seeing the same error.
Caused by: java.lang.OutOfMemoryError: unable to create new native thread.
The behavior is the following:
One task manager crashes, from that point submitting new jobs fail with the following error:
Caused by: java.io.IOException: Could not connect to BlobServer at address
Then we see the native thread error on another task manager.

The cluster is up without running jobs until we restart the task / job managers.

Our blob related configuration:

  *   blob.server.port: 6124
  *   blob.fetch.num-concurrent: 300
  *   blob.fetch.retries: 20
  *   blob.service.cleanup.interval: 10800

full stack trace of the submitting error:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job 'job_name'.\n", b'\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)\n',
                org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)\n',
                org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)\n',
                org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)\n',
                org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)\n',
                org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)\n',
                org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)\n',
                org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)\n',
                org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)\n', b"Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'job_name'.\n",
                org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)\n',
                org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)\n',
                org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n',
                org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)\n',
                com.startapp.consumer.KafkaStreaming.main(KafkaStreaming.java:84)\n',
                sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n',
                sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n',
                sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n',
                java.lang.reflect.Method.invoke(Method.java:498)\n',
                org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)\n',
\t... 8 more\n',
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.\n',
                org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)\n',
                java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)\n',
                java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)\n',
                java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
                java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)\n',
                org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:364)\n',
                java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n',
                java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n',
                java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
                java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)\n',
                java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)\n',
                java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n',
                java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n',
                java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n',
                java.lang.Thread.run(Thread.java:748)\n',
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not upload job files.\n',
                org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:201)\n',
                java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)\n',
                java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)\n',
                java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
                java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n',
                java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n',
                java.util.concurrent.FutureTask.run(FutureTask.java:266)\n',
                java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n',
                java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n',
                java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n',
                java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n',
                java.lang.Thread.run(Thread.java:748)\n',
Caused by: org.apache.flink.util.FlinkException: Could not upload job files.\n',
                org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86)\n',
                org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:195)\n',
\t... 11 more\n',
Caused by: java.io.IOException: Could not connect to BlobServer at address DOMAIN/IP:PORT\n',
                org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:102)\n',
                org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:199)\n',
                org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:82)\n',
\t... 12 more\n',
Caused by: java.net.ConnectException: Connection refused (Connection refused)\n',
                java.net.PlainSocketImpl.socketConnect(Native Method)\n',
                java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)\n',
                java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)\n',
                java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)\n',
                java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n',
                java.net.Socket.connect(Socket.java:607)\n',
                org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:96)\n',
\t... 14 more\n',
]\n',
                org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:486)\n',
                org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:466)\n',
                java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)\n',
                java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)\n',
\t... 4 more\n']

Could the blob error lead to the native thread error?
Is there any changes on the configuration that we can make to avoid the blob related errors?
Can the current submission method that we are using trigger the native thread error?


Thanks,
Ilan.


From: Ilan Huchansky <il...@start.io>>
Date: Tuesday, 7 December 2021 at 11:22
To: David Morávek <dm...@apache.org>>
Cc: user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>, Start.io SDP <sd...@start.io>>
Subject: Re: Unable to create new native thread error
Hi David,

In that case, I will start working on using the CLI instead of the REST API right away.

Will update you when I finish.

Thanks for the help,
Ilan.


From: David Morávek <dm...@apache.org>>
Date: Monday, 6 December 2021 at 10:34
To: Ilan Huchansky <il...@start.io>>
Cc: user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>, Start.io SDP <sd...@start.io>>
Subject: Re: Unable to create new native thread error
Hi Ilan,

I think so, using CLI instead of REST API should solve this, as the user code execution would be pulled out to a separate JVM. If you're going to try that, it would be great to hear back whether it has solved your issue.

As for 1.13.4, there is currently no on-going effort / concrete plan on the release.

Best,
D.

On Sun, Dec 5, 2021 at 4:06 PM Ilan Huchansky <il...@start.io>> wrote:
Hi David,

Thanks for your fast response.

Do you think that changing the submission method could solve the problem? Using the CLI instead of the REST API.

Another question, I see that the most critical issue (FLINK-25022) is in progress and should be released on with version 1.13.4 , do you know when this version is planned to be released?

Thanks again,
Ilan.

From: David Morávek <dm...@apache.org>>
Date: Thursday, 2 December 2021 at 17:25
To: Ilan Huchansky <il...@start.io>>
Cc: user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>, Start.io SDP <sd...@start.io>>
Subject: Re: Unable to create new native thread error
Hi Ilan,

we are aware of multiple issues when web-submission can result in classloader / thread local leaks, which could potentially result in the behavior you're describing. We're working on addressing them.

FLINK-25022 [1]: The most critical one leaking thread locals.
FLINK-25027 [2]: Is only a memory improvement for a particular situation (a lot of small batch jobs) and could be fixed by accounting for when setting Metaspace size.
FLINK-25023 [3]: Can leak the classloader of the first job submitted via rest API. (constant overhead for Metaspace)

In general, web-submission is different from a normal submission in way, that the "main method" of the uploaded jar is executed on JobManager and it's really hard to isolate it's execution from possible side effects.

[1] https://issues.apache.org/jira/browse/FLINK-25022
[2] https://issues.apache.org/jira/browse/FLINK-25027
[3] https://issues.apache.org/jira/browse/FLINK-25023

Best,
D.

On Thu, Dec 2, 2021 at 3:51 PM Ilan Huchansky <il...@start.io>> wrote:
Hi Flink mailing list,

I am Ilan from Start.io data platform team, need some guidance.

We have a flow with the following use case:


  *   We read files from AWS S3 buckets process them on our cluster and sink the data into files using Flink file sink.
  *   The jobs use always the same jar, we uploaded it to every job manager on the cluster.
  *   We are submitting jobs constantly through the REST API.
  *   Each job reads one or more files from S3.
  *   The jobs can run from 20 seconds up to 3.5 hours.
  *   The jobs run on batch mode
  *   Running flink 1.13.1
  *   We are running in cluster mode using docker, same machines are being used for task and job manager.

 We are struggling with the same error, over and over again. We encounter it in the job manager and in the task manager.

After a while that the cluster is running and jobs are finishing correctly the task and job manager fail to operate due to:
Caused by: java.lang.OutOfMemoryError: unable to create new native thread.


We also see some sporadic failure of java.lang.NoClassDefFoundError, not sure it is related.

Our set up and configuration are as follow:
•         5 nodes cluster running on docker
•         Relevant memory config:
jobmanager.memory.heap.size: 1600m
taskmanager.memory.process.size: 231664m
taskmanager.memory.network.fraction: 0.3
taskmanager.memory.jvm-metaspace.size: 10g
jobmanager.memory.jvm-metaspace.size: 2g
taskmanager.memory.framework.off-heap.size: 1g

•         Host details
max locked memory  (kbytes, -l) 65536
max memory size       (kbytes, -m) unlimited
open files                     (-n) 1024
max user processes    (-u) 1547269
virtual memory           (kbytes, -v) unlimited
file locks                       (-x) unlimited

cat /proc/sys/kernel/threads-max: 3094538
kernel.pid_max = 57344


We try to increase the max user processes, also to increase and decrease the jvm-metaspace.

Should we keep increasing the max number of processes on the host, Is there a way to limit the number of threads from flink config?

What should we do? Any insights?
I can provide more information as needed.

Thanks in advance

 Ilan


Re: Unable to create new native thread error

Posted by David Morávek <dm...@apache.org>.
Hi Ilan, can you please check number of threads on the task-managers / OS?
As far as I remember this happens when system can not create any more
threads (there is a system wide limit */proc/sys/kernel/threads-max* [1]).
Please not that the limit might be exhausted by other processes.

[1] https://man7.org/linux/man-pages/man5/proc.5.html

D.

On Sun, Dec 12, 2021 at 2:53 PM Ilan Huchansky <il...@start.io>
wrote:

> Hi David,
>
>
>
> Sorry for the previous mail, sent it before it was finished, please ignore.
>
>
>
> We made the changes, now submitting the jobs using flink CLI.
>
> To be more specific –
>
> We use a docker, with a flink image containing flink CLI. We submit the
> jobs with the run command specifying the job manager we want to submit to.
> We call this the submitter.
>                 As explained in previous mails, job managers and task
> managers run over docker on separated machines (each machine has 1 task
> manger and 1 job manager) and also separated from the submitter.
>
>
>
> Unfortunately, we are still seeing the same error.
>
> Caused by: java.lang.OutOfMemoryError: unable to create new native thread.
>
> The behavior is the following:
>
> One task manager crashes, from that point submitting new jobs fail with
> the following error:
>
> Caused by: java.io.IOException: Could not connect to BlobServer at address
>
> Then we see the native thread error on another task manager.
>
>
>
> The cluster is up without running jobs until we restart the task / job
> managers.
>
>
>
> Our blob related configuration:
>
>    - blob.server.port: 6124
>    - blob.fetch.num-concurrent: 300
>    - blob.fetch.retries: 20
>    - blob.service.cleanup.interval: 10800
>
>
>
> full stack trace of the submitting error:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Failed to execute job 'job_name'.\n", b'\tat
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)\n',
>
>
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)\n',
>
>
>
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)\n',
>
>
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)\n',
>
>
>
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)\n',
>
>
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)\n',
>
>
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)\n',
>
>
>
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)\n',
>
>
>
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)\n',
> b"Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> 'job_name'.\n",
>
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)\n',
>
>
>
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)\n',
>
>
>
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n',
>
>
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)\n',
>
>
>
> com.startapp.consumer.KafkaStreaming.main(KafkaStreaming.java:84)\n',
>
>                 sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)\n',
>
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n',
>
>
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n',
>
>
>                 java.lang.reflect.Method.invoke(Method.java:498)\n',
>
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)\n',
>
>
> \t... 8 more\n',
>
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.\n',
>
>
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)\n',
>
>
>
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)\n',
>
>
>
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)\n',
>
>
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
>
>
>
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)\n',
>
>
>
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:364)\n',
>
>
>
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n',
>
>
>
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n',
>
>
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
>
>
>
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)\n',
>
>
>
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)\n',
>
>
>
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n',
>
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n',
>
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n',
>
>
>                 java.lang.Thread.run(Thread.java:748)\n',
>
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not
> upload job files.\n',
>
>
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:201)\n',
>
>
>
> java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)\n',
>
>
>
> java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)\n',
>
>
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
>
>
>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n',
>
>
>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n',
>
>
> java.util.concurrent.FutureTask.run(FutureTask.java:266)\n',
>
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n',
>
>
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n',
>
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n',
>
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n',
>
>
>                 java.lang.Thread.run(Thread.java:748)\n',
>
> Caused by: org.apache.flink.util.FlinkException: Could not upload job
> files.\n',
>
>
> org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86)\n',
>
>
>
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:195)\n',
>
>
> \t... 11 more\n',
>
> Caused by: java.io.IOException: Could not connect to BlobServer at address
> DOMAIN/IP:PORT\n',
>
>
> org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:102)\n',
>
>
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:199)\n',
>
>
>
> org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:82)\n',
>
>
> \t... 12 more\n',
>
> Caused by: java.net.ConnectException: Connection refused (Connection
> refused)\n',
>
>                 java.net.PlainSocketImpl.socketConnect(Native Method)\n',
>
>
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)\n',
>
>
>
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)\n',
>
>
>
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)\n',
>
>
>
> java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n',
>
>                 java.net.Socket.connect(Socket.java:607)\n',
>
>
> org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:96)\n',
>
> \t... 14 more\n',
>
> ]\n',
>
>
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:486)\n',
>
>
>
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:466)\n',
>
>
>
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)\n',
>
>
>
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)\n',
>
>
> \t... 4 more\n']
>
>
>
> Could the blob error lead to the native thread error?
>
> Is there any changes on the configuration that we can make to avoid the
> blob related errors?
>
> Can the current submission method that we are using trigger the native
> thread error?
>
>
>
>
> Thanks,
> Ilan.
>
>
>
>
>
> *From: *Ilan Huchansky <il...@start.io>
> *Date: *Tuesday, 7 December 2021 at 11:22
> *To: *David Morávek <dm...@apache.org>
> *Cc: *user@flink.apache.org <us...@flink.apache.org>, Start.io SDP <
> sdp@start.io>
> *Subject: *Re: Unable to create new native thread error
>
> Hi David,
>
>
>
> In that case, I will start working on using the CLI instead of the REST
> API right away.
>
>
>
> Will update you when I finish.
>
>
>
> Thanks for the help,
>
> Ilan.
>
>
>
>
>
> *From: *David Morávek <dm...@apache.org>
> *Date: *Monday, 6 December 2021 at 10:34
> *To: *Ilan Huchansky <il...@start.io>
> *Cc: *user@flink.apache.org <us...@flink.apache.org>, Start.io SDP <
> sdp@start.io>
> *Subject: *Re: Unable to create new native thread error
>
> Hi Ilan,
>
>
>
> I think so, using CLI instead of REST API should solve this, as the user
> code execution would be pulled out to a separate JVM. If you're going to
> try that, it would be great to hear back whether it has solved your issue.
>
>
>
> As for 1.13.4, there is currently no on-going effort / concrete plan on
> the release.
>
>
>
> Best,
>
> D.
>
>
>
> On Sun, Dec 5, 2021 at 4:06 PM Ilan Huchansky <il...@start.io>
> wrote:
>
> Hi David,
>
>
>
> Thanks for your fast response.
>
>
>
> Do you think that changing the submission method could solve the problem?
> Using the CLI instead of the REST API.
>
>
>
> Another question, I see that the most critical issue (FLINK-25022) is in
> progress and should be released on with version 1.13.4 , do you know when
> this version is planned to be released?
>
>
>
> Thanks again,
>
> Ilan.
>
>
>
> *From: *David Morávek <dm...@apache.org>
> *Date: *Thursday, 2 December 2021 at 17:25
> *To: *Ilan Huchansky <il...@start.io>
> *Cc: *user@flink.apache.org <us...@flink.apache.org>, Start.io SDP <
> sdp@start.io>
> *Subject: *Re: Unable to create new native thread error
>
> Hi Ilan,
>
>
>
> we are aware of multiple issues when web-submission can result in
> classloader / thread local leaks, which could potentially result in the
> behavior you're describing. We're working on addressing them.
>
>
>
> FLINK-25022 [1]: The most critical one leaking thread locals.
> FLINK-25027 [2]: Is only a memory improvement for a particular situation
> (a lot of small batch jobs) and could be fixed by accounting for when
> setting Metaspace size.
> FLINK-25023 [3]: Can leak the classloader of the first job submitted via
> rest API. (constant overhead for Metaspace)
>
>
>
> In general, web-submission is different from a normal submission in way,
> that the "main method" of the uploaded jar is executed on JobManager and
> it's really hard to isolate it's execution from possible side effects.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-25022
>
> [2] https://issues.apache.org/jira/browse/FLINK-25027
>
> [3] https://issues.apache.org/jira/browse/FLINK-25023
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 3:51 PM Ilan Huchansky <il...@start.io>
> wrote:
>
> *Hi Flink mailing list,*
>
>
>
> I am Ilan from Start.io data platform team, need some guidance.
>
>
>
> We have a flow with the following use case:
>
>
>
>    - We read files from AWS S3 buckets process them on our cluster and
>    sink the data into files using Flink file sink.
>    - The jobs use always the same jar, we uploaded it to every job
>    manager on the cluster.
>    - We are submitting jobs constantly through the REST API.
>    - Each job reads one or more files from S3.
>    - The jobs can run from 20 seconds up to 3.5 hours.
>    - The jobs run on batch mode
>    - Running flink 1.13.1
>    - We are running in cluster mode using docker, same machines are being
>    used for task and job manager.
>
>
>
>  We are struggling with the same error, over and over again. We encounter
> it in the job manager and in the task manager.
>
>
>
> After a while that the cluster is running and jobs are finishing correctly
> the task and job manager fail to operate due to:
>
> Caused by: java.lang.OutOfMemoryError: unable to create new native thread.
>
>
>
>
>
> We also see some sporadic failure of java.lang.NoClassDefFoundError, not
> sure it is related.
>
>
>
> Our set up and configuration are as follow:
>
> ·         5 nodes cluster running on docker
>
> ·         Relevant memory config:
>
> jobmanager.memory.heap.size: 1600m
>
> taskmanager.memory.process.size: 231664m
>
> taskmanager.memory.network.fraction: 0.3
>
> taskmanager.memory.jvm-metaspace.size: 10g
>
> jobmanager.memory.jvm-metaspace.size: 2g
>
> taskmanager.memory.framework.off-heap.size: 1g
>
>
>
> ·         Host details
>
> max locked memory  (kbytes, -l) 65536
>
> max memory size       (kbytes, -m) unlimited
>
> open files                     (-n) 1024
>
> max user processes    (-u) 1547269
>
> virtual memory           (kbytes, -v) unlimited
>
> file locks                       (-x) unlimited
>
>
>
> cat /proc/sys/kernel/threads-max: 3094538
>
> kernel.pid_max = 57344
>
>
>
>
>
> We try to increase the max user processes, also to increase and decrease
> the jvm-metaspace.
>
>
>
> Should we keep increasing the max number of processes on the host, Is
> there a way to limit the number of threads from flink config?
>
>
>
> What should we do? Any insights?
> I can provide more information as needed.
>
>
>
> Thanks in advance
>
>
>
>  Ilan
>
>
>
>

Re: Unable to create new native thread error

Posted by Ilan Huchansky <il...@start.io>.
Hi David,

Sorry for the previous mail, sent it before it was finished, please ignore.

We made the changes, now submitting the jobs using flink CLI.
To be more specific –
We use a docker, with a flink image containing flink CLI. We submit the jobs with the run command specifying the job manager we want to submit to. We call this the submitter.
                As explained in previous mails, job managers and task managers run over docker on separated machines (each machine has 1 task manger and 1 job manager) and also separated from the submitter.

Unfortunately, we are still seeing the same error.
Caused by: java.lang.OutOfMemoryError: unable to create new native thread.
The behavior is the following:
One task manager crashes, from that point submitting new jobs fail with the following error:
Caused by: java.io.IOException: Could not connect to BlobServer at address
Then we see the native thread error on another task manager.

The cluster is up without running jobs until we restart the task / job managers.

Our blob related configuration:

  *   blob.server.port: 6124
  *   blob.fetch.num-concurrent: 300
  *   blob.fetch.retries: 20
  *   blob.service.cleanup.interval: 10800

full stack trace of the submitting error:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job 'job_name'.\n", b'\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)\n',
                org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)\n',
                org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)\n',
                org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)\n',
                org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)\n',
                org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)\n',
                org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)\n',
                org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)\n',
                org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)\n', b"Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'job_name'.\n",
                org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)\n',
                org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)\n',
                org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n',
                org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)\n',
                com.startapp.consumer.KafkaStreaming.main(KafkaStreaming.java:84)\n',
                sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n',
                sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n',
                sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n',
                java.lang.reflect.Method.invoke(Method.java:498)\n',
                org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)\n',
\t... 8 more\n',
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.\n',
                org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)\n',
                java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)\n',
                java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)\n',
                java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
                java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)\n',
                org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:364)\n',
                java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n',
                java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n',
                java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
                java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)\n',
                java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)\n',
                java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n',
                java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n',
                java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n',
                java.lang.Thread.run(Thread.java:748)\n',
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not upload job files.\n',
                org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:201)\n',
                java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)\n',
                java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)\n',
                java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
                java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n',
                java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n',
                java.util.concurrent.FutureTask.run(FutureTask.java:266)\n',
                java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n',
                java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n',
                java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n',
                java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n',
                java.lang.Thread.run(Thread.java:748)\n',
Caused by: org.apache.flink.util.FlinkException: Could not upload job files.\n',
                org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86)\n',
                org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:195)\n',
\t... 11 more\n',
Caused by: java.io.IOException: Could not connect to BlobServer at address DOMAIN/IP:PORT\n',
                org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:102)\n',
                org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:199)\n',
                org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:82)\n',
\t... 12 more\n',
Caused by: java.net.ConnectException: Connection refused (Connection refused)\n',
                java.net.PlainSocketImpl.socketConnect(Native Method)\n',
                java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)\n',
                java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)\n',
                java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)\n',
                java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n',
                java.net.Socket.connect(Socket.java:607)\n',
                org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:96)\n',
\t... 14 more\n',
]\n',
                org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:486)\n',
                org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:466)\n',
                java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)\n',
                java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)\n',
\t... 4 more\n']

Could the blob error lead to the native thread error?
Is there any changes on the configuration that we can make to avoid the blob related errors?
Can the current submission method that we are using trigger the native thread error?


Thanks,
Ilan.


From: Ilan Huchansky <il...@start.io>
Date: Tuesday, 7 December 2021 at 11:22
To: David Morávek <dm...@apache.org>
Cc: user@flink.apache.org <us...@flink.apache.org>, Start.io SDP <sd...@start.io>
Subject: Re: Unable to create new native thread error
Hi David,

In that case, I will start working on using the CLI instead of the REST API right away.

Will update you when I finish.

Thanks for the help,
Ilan.


From: David Morávek <dm...@apache.org>
Date: Monday, 6 December 2021 at 10:34
To: Ilan Huchansky <il...@start.io>
Cc: user@flink.apache.org <us...@flink.apache.org>, Start.io SDP <sd...@start.io>
Subject: Re: Unable to create new native thread error
Hi Ilan,

I think so, using CLI instead of REST API should solve this, as the user code execution would be pulled out to a separate JVM. If you're going to try that, it would be great to hear back whether it has solved your issue.

As for 1.13.4, there is currently no on-going effort / concrete plan on the release.

Best,
D.

On Sun, Dec 5, 2021 at 4:06 PM Ilan Huchansky <il...@start.io>> wrote:
Hi David,

Thanks for your fast response.

Do you think that changing the submission method could solve the problem? Using the CLI instead of the REST API.

Another question, I see that the most critical issue (FLINK-25022) is in progress and should be released on with version 1.13.4 , do you know when this version is planned to be released?

Thanks again,
Ilan.

From: David Morávek <dm...@apache.org>>
Date: Thursday, 2 December 2021 at 17:25
To: Ilan Huchansky <il...@start.io>>
Cc: user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>, Start.io SDP <sd...@start.io>>
Subject: Re: Unable to create new native thread error
Hi Ilan,

we are aware of multiple issues when web-submission can result in classloader / thread local leaks, which could potentially result in the behavior you're describing. We're working on addressing them.

FLINK-25022 [1]: The most critical one leaking thread locals.
FLINK-25027 [2]: Is only a memory improvement for a particular situation (a lot of small batch jobs) and could be fixed by accounting for when setting Metaspace size.
FLINK-25023 [3]: Can leak the classloader of the first job submitted via rest API. (constant overhead for Metaspace)

In general, web-submission is different from a normal submission in way, that the "main method" of the uploaded jar is executed on JobManager and it's really hard to isolate it's execution from possible side effects.

[1] https://issues.apache.org/jira/browse/FLINK-25022
[2] https://issues.apache.org/jira/browse/FLINK-25027
[3] https://issues.apache.org/jira/browse/FLINK-25023

Best,
D.

On Thu, Dec 2, 2021 at 3:51 PM Ilan Huchansky <il...@start.io>> wrote:
Hi Flink mailing list,

I am Ilan from Start.io data platform team, need some guidance.

We have a flow with the following use case:


  *   We read files from AWS S3 buckets process them on our cluster and sink the data into files using Flink file sink.
  *   The jobs use always the same jar, we uploaded it to every job manager on the cluster.
  *   We are submitting jobs constantly through the REST API.
  *   Each job reads one or more files from S3.
  *   The jobs can run from 20 seconds up to 3.5 hours.
  *   The jobs run on batch mode
  *   Running flink 1.13.1
  *   We are running in cluster mode using docker, same machines are being used for task and job manager.

 We are struggling with the same error, over and over again. We encounter it in the job manager and in the task manager.

After a while that the cluster is running and jobs are finishing correctly the task and job manager fail to operate due to:
Caused by: java.lang.OutOfMemoryError: unable to create new native thread.


We also see some sporadic failure of java.lang.NoClassDefFoundError, not sure it is related.

Our set up and configuration are as follow:
•         5 nodes cluster running on docker
•         Relevant memory config:
jobmanager.memory.heap.size: 1600m
taskmanager.memory.process.size: 231664m
taskmanager.memory.network.fraction: 0.3
taskmanager.memory.jvm-metaspace.size: 10g
jobmanager.memory.jvm-metaspace.size: 2g
taskmanager.memory.framework.off-heap.size: 1g

•         Host details
max locked memory  (kbytes, -l) 65536
max memory size       (kbytes, -m) unlimited
open files                     (-n) 1024
max user processes    (-u) 1547269
virtual memory           (kbytes, -v) unlimited
file locks                       (-x) unlimited

cat /proc/sys/kernel/threads-max: 3094538
kernel.pid_max = 57344


We try to increase the max user processes, also to increase and decrease the jvm-metaspace.

Should we keep increasing the max number of processes on the host, Is there a way to limit the number of threads from flink config?

What should we do? Any insights?
I can provide more information as needed.

Thanks in advance

 Ilan


Re: Unable to create new native thread error

Posted by Ilan Huchansky <il...@start.io>.
Hi David,

We made the changes, now submitting the jobs using flink CLI.
To be more specific

Unfortunately, we are still seeing the same error.
Caused by: java.lang.OutOfMemoryError: unable to create new native thread.
The behavior is the following:
One task manager crashes, from that point submitting new jobs fail with the following error:
Caused by: java.io.IOException: Could not connect to BlobServer at address
Then we saw the native thread error on another task manager.

The cluster is up without running jobs until we restart some task / job managers.

Our blob related configuration:

  *   blob.server.port: 6124
  *   blob.fetch.num-concurrent: 300
  *   blob.fetch.retries: 20
  *   blob.service.cleanup.interval: 10800

full stack trace of the submitting error:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job 'job_name'.\n", b'\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)\n',
                org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)\n',
                org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)\n',
                org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)\n',
                org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)\n',
                org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)\n',
                org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)\n',
                org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)\n',
                org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)\n', b"Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'job_name'.\n",
                org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)\n',
                org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)\n',
                org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n',
                org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)\n',
                com.startapp.consumer.KafkaStreaming.main(KafkaStreaming.java:84)\n',
                sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n',
                sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n',
                sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n',
                java.lang.reflect.Method.invoke(Method.java:498)\n',
                org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)\n',
\t... 8 more\n',
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.\n',
                org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)\n',
                java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)\n',
                java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)\n',
                java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
                java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)\n',
                org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:364)\n',
                java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n',
                java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n',
                java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
                java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)\n',
                java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)\n',
                java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n',
                java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n',
                java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n',
                java.lang.Thread.run(Thread.java:748)\n',
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: Could not upload job files.\n',
                org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:201)\n',
                java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)\n',
                java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)\n',
                java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n',
                java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n',
                java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n',
                java.util.concurrent.FutureTask.run(FutureTask.java:266)\n',
                java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n',
                java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n',
                java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n',
                java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n',
                java.lang.Thread.run(Thread.java:748)\n',
Caused by: org.apache.flink.util.FlinkException: Could not upload job files.\n',
                org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86)\n',
                org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:195)\n',
\t... 11 more\n',
Caused by: java.io.IOException: Could not connect to BlobServer at address DOMAIN/IP:PORT\n',
                org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:102)\n',
                org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:199)\n',
                org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:82)\n',
\t... 12 more\n',
Caused by: java.net.ConnectException: Connection refused (Connection refused)\n',
                java.net.PlainSocketImpl.socketConnect(Native Method)\n',
                java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)\n',
                java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)\n',
                java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)\n',
                java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n',
                java.net.Socket.connect(Socket.java:607)\n',
                org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:96)\n',
\t... 14 more\n',
]\n',
                org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:486)\n',
                org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:466)\n',
                java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)\n',
                java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)\n',
\t... 4 more\n']



[cidimage001.png@01D79447.4139E0E0]<https://www.start.io/>


Ilan Huchansky ● Big data developer
M 972-54-5200110
Explore our audiences<https://www.start.io/audience/>



From: Ilan Huchansky <il...@start.io>
Date: Tuesday, 7 December 2021 at 11:22
To: David Morávek <dm...@apache.org>
Cc: user@flink.apache.org <us...@flink.apache.org>, Start.io SDP <sd...@start.io>
Subject: Re: Unable to create new native thread error
Hi David,

In that case, I will start working on using the CLI instead of the REST API right away.

Will update you when I finish.

Thanks for the help,
Ilan.


From: David Morávek <dm...@apache.org>
Date: Monday, 6 December 2021 at 10:34
To: Ilan Huchansky <il...@start.io>
Cc: user@flink.apache.org <us...@flink.apache.org>, Start.io SDP <sd...@start.io>
Subject: Re: Unable to create new native thread error
Hi Ilan,

I think so, using CLI instead of REST API should solve this, as the user code execution would be pulled out to a separate JVM. If you're going to try that, it would be great to hear back whether it has solved your issue.

As for 1.13.4, there is currently no on-going effort / concrete plan on the release.

Best,
D.

On Sun, Dec 5, 2021 at 4:06 PM Ilan Huchansky <il...@start.io>> wrote:
Hi David,

Thanks for your fast response.

Do you think that changing the submission method could solve the problem? Using the CLI instead of the REST API.

Another question, I see that the most critical issue (FLINK-25022) is in progress and should be released on with version 1.13.4 , do you know when this version is planned to be released?

Thanks again,
Ilan.

From: David Morávek <dm...@apache.org>>
Date: Thursday, 2 December 2021 at 17:25
To: Ilan Huchansky <il...@start.io>>
Cc: user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>, Start.io SDP <sd...@start.io>>
Subject: Re: Unable to create new native thread error
Hi Ilan,

we are aware of multiple issues when web-submission can result in classloader / thread local leaks, which could potentially result in the behavior you're describing. We're working on addressing them.

FLINK-25022 [1]: The most critical one leaking thread locals.
FLINK-25027 [2]: Is only a memory improvement for a particular situation (a lot of small batch jobs) and could be fixed by accounting for when setting Metaspace size.
FLINK-25023 [3]: Can leak the classloader of the first job submitted via rest API. (constant overhead for Metaspace)

In general, web-submission is different from a normal submission in way, that the "main method" of the uploaded jar is executed on JobManager and it's really hard to isolate it's execution from possible side effects.

[1] https://issues.apache.org/jira/browse/FLINK-25022
[2] https://issues.apache.org/jira/browse/FLINK-25027
[3] https://issues.apache.org/jira/browse/FLINK-25023

Best,
D.

On Thu, Dec 2, 2021 at 3:51 PM Ilan Huchansky <il...@start.io>> wrote:
Hi Flink mailing list,

I am Ilan from Start.io data platform team, need some guidance.

We have a flow with the following use case:


  *   We read files from AWS S3 buckets process them on our cluster and sink the data into files using Flink file sink.
  *   The jobs use always the same jar, we uploaded it to every job manager on the cluster.
  *   We are submitting jobs constantly through the REST API.
  *   Each job reads one or more files from S3.
  *   The jobs can run from 20 seconds up to 3.5 hours.
  *   The jobs run on batch mode
  *   Running flink 1.13.1
  *   We are running in cluster mode using docker, same machines are being used for task and job manager.

 We are struggling with the same error, over and over again. We encounter it in the job manager and in the task manager.

After a while that the cluster is running and jobs are finishing correctly the task and job manager fail to operate due to:
Caused by: java.lang.OutOfMemoryError: unable to create new native thread.


We also see some sporadic failure of java.lang.NoClassDefFoundError, not sure it is related.

Our set up and configuration are as follow:
*         5 nodes cluster running on docker
*         Relevant memory config:
jobmanager.memory.heap.size: 1600m
taskmanager.memory.process.size: 231664m
taskmanager.memory.network.fraction: 0.3
taskmanager.memory.jvm-metaspace.size: 10g
jobmanager.memory.jvm-metaspace.size: 2g
taskmanager.memory.framework.off-heap.size: 1g

*         Host details
max locked memory  (kbytes, -l) 65536
max memory size       (kbytes, -m) unlimited
open files                     (-n) 1024
max user processes    (-u) 1547269
virtual memory           (kbytes, -v) unlimited
file locks                       (-x) unlimited

cat /proc/sys/kernel/threads-max: 3094538
kernel.pid_max = 57344


We try to increase the max user processes, also to increase and decrease the jvm-metaspace.

Should we keep increasing the max number of processes on the host, Is there a way to limit the number of threads from flink config?

What should we do? Any insights?
I can provide more information as needed.

Thanks in advance

 Ilan


Re: Unable to create new native thread error

Posted by Ilan Huchansky <il...@start.io>.
Hi David,

In that case, I will start working on using the CLI instead of the REST API right away.

Will update you when I finish.

Thanks for the help,
Ilan.


From: David Morávek <dm...@apache.org>
Date: Monday, 6 December 2021 at 10:34
To: Ilan Huchansky <il...@start.io>
Cc: user@flink.apache.org <us...@flink.apache.org>, Start.io SDP <sd...@start.io>
Subject: Re: Unable to create new native thread error
Hi Ilan,

I think so, using CLI instead of REST API should solve this, as the user code execution would be pulled out to a separate JVM. If you're going to try that, it would be great to hear back whether it has solved your issue.

As for 1.13.4, there is currently no on-going effort / concrete plan on the release.

Best,
D.

On Sun, Dec 5, 2021 at 4:06 PM Ilan Huchansky <il...@start.io>> wrote:
Hi David,

Thanks for your fast response.

Do you think that changing the submission method could solve the problem? Using the CLI instead of the REST API.

Another question, I see that the most critical issue (FLINK-25022) is in progress and should be released on with version 1.13.4 , do you know when this version is planned to be released?

Thanks again,
Ilan.

From: David Morávek <dm...@apache.org>>
Date: Thursday, 2 December 2021 at 17:25
To: Ilan Huchansky <il...@start.io>>
Cc: user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>, Start.io SDP <sd...@start.io>>
Subject: Re: Unable to create new native thread error
Hi Ilan,

we are aware of multiple issues when web-submission can result in classloader / thread local leaks, which could potentially result in the behavior you're describing. We're working on addressing them.

FLINK-25022 [1]: The most critical one leaking thread locals.
FLINK-25027 [2]: Is only a memory improvement for a particular situation (a lot of small batch jobs) and could be fixed by accounting for when setting Metaspace size.
FLINK-25023 [3]: Can leak the classloader of the first job submitted via rest API. (constant overhead for Metaspace)

In general, web-submission is different from a normal submission in way, that the "main method" of the uploaded jar is executed on JobManager and it's really hard to isolate it's execution from possible side effects.

[1] https://issues.apache.org/jira/browse/FLINK-25022
[2] https://issues.apache.org/jira/browse/FLINK-25027
[3] https://issues.apache.org/jira/browse/FLINK-25023

Best,
D.

On Thu, Dec 2, 2021 at 3:51 PM Ilan Huchansky <il...@start.io>> wrote:
Hi Flink mailing list,

I am Ilan from Start.io data platform team, need some guidance.

We have a flow with the following use case:


  *   We read files from AWS S3 buckets process them on our cluster and sink the data into files using Flink file sink.
  *   The jobs use always the same jar, we uploaded it to every job manager on the cluster.
  *   We are submitting jobs constantly through the REST API.
  *   Each job reads one or more files from S3.
  *   The jobs can run from 20 seconds up to 3.5 hours.
  *   The jobs run on batch mode
  *   Running flink 1.13.1
  *   We are running in cluster mode using docker, same machines are being used for task and job manager.

 We are struggling with the same error, over and over again. We encounter it in the job manager and in the task manager.

After a while that the cluster is running and jobs are finishing correctly the task and job manager fail to operate due to:
Caused by: java.lang.OutOfMemoryError: unable to create new native thread.


We also see some sporadic failure of java.lang.NoClassDefFoundError, not sure it is related.

Our set up and configuration are as follow:
*         5 nodes cluster running on docker
*         Relevant memory config:
jobmanager.memory.heap.size: 1600m
taskmanager.memory.process.size: 231664m
taskmanager.memory.network.fraction: 0.3
taskmanager.memory.jvm-metaspace.size: 10g
jobmanager.memory.jvm-metaspace.size: 2g
taskmanager.memory.framework.off-heap.size: 1g

*         Host details
max locked memory  (kbytes, -l) 65536
max memory size       (kbytes, -m) unlimited
open files                     (-n) 1024
max user processes    (-u) 1547269
virtual memory           (kbytes, -v) unlimited
file locks                       (-x) unlimited

cat /proc/sys/kernel/threads-max: 3094538
kernel.pid_max = 57344


We try to increase the max user processes, also to increase and decrease the jvm-metaspace.

Should we keep increasing the max number of processes on the host, Is there a way to limit the number of threads from flink config?

What should we do? Any insights?
I can provide more information as needed.

Thanks in advance

 Ilan


Re: Unable to create new native thread error

Posted by David Morávek <dm...@apache.org>.
Hi Ilan,

I think so, using CLI instead of REST API should solve this, as the user
code execution would be pulled out to a separate JVM. If you're going to
try that, it would be great to hear back whether it has solved your issue.

As for 1.13.4, there is currently no on-going effort / concrete plan on the
release.

Best,
D.

On Sun, Dec 5, 2021 at 4:06 PM Ilan Huchansky <il...@start.io>
wrote:

> Hi David,
>
>
>
> Thanks for your fast response.
>
>
>
> Do you think that changing the submission method could solve the problem?
> Using the CLI instead of the REST API.
>
>
>
> Another question, I see that the most critical issue (FLINK-25022) is in
> progress and should be released on with version 1.13.4 , do you know when
> this version is planned to be released?
>
>
>
> Thanks again,
>
> Ilan.
>
>
>
> *From: *David Morávek <dm...@apache.org>
> *Date: *Thursday, 2 December 2021 at 17:25
> *To: *Ilan Huchansky <il...@start.io>
> *Cc: *user@flink.apache.org <us...@flink.apache.org>, Start.io SDP <
> sdp@start.io>
> *Subject: *Re: Unable to create new native thread error
>
> Hi Ilan,
>
>
>
> we are aware of multiple issues when web-submission can result in
> classloader / thread local leaks, which could potentially result in the
> behavior you're describing. We're working on addressing them.
>
>
>
> FLINK-25022 [1]: The most critical one leaking thread locals.
> FLINK-25027 [2]: Is only a memory improvement for a particular situation
> (a lot of small batch jobs) and could be fixed by accounting for when
> setting Metaspace size.
> FLINK-25023 [3]: Can leak the classloader of the first job submitted via
> rest API. (constant overhead for Metaspace)
>
>
>
> In general, web-submission is different from a normal submission in way,
> that the "main method" of the uploaded jar is executed on JobManager and
> it's really hard to isolate it's execution from possible side effects.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-25022
>
> [2] https://issues.apache.org/jira/browse/FLINK-25027
>
> [3] https://issues.apache.org/jira/browse/FLINK-25023
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 3:51 PM Ilan Huchansky <il...@start.io>
> wrote:
>
> *Hi Flink mailing list,*
>
>
>
> I am Ilan from Start.io data platform team, need some guidance.
>
>
>
> We have a flow with the following use case:
>
>
>
>    - We read files from AWS S3 buckets process them on our cluster and
>    sink the data into files using Flink file sink.
>    - The jobs use always the same jar, we uploaded it to every job
>    manager on the cluster.
>    - We are submitting jobs constantly through the REST API.
>    - Each job reads one or more files from S3.
>    - The jobs can run from 20 seconds up to 3.5 hours.
>    - The jobs run on batch mode
>    - Running flink 1.13.1
>    - We are running in cluster mode using docker, same machines are being
>    used for task and job manager.
>
>
>
>  We are struggling with the same error, over and over again. We encounter
> it in the job manager and in the task manager.
>
>
>
> After a while that the cluster is running and jobs are finishing correctly
> the task and job manager fail to operate due to:
>
> Caused by: java.lang.OutOfMemoryError: unable to create new native thread.
>
>
>
>
>
> We also see some sporadic failure of java.lang.NoClassDefFoundError, not
> sure it is related.
>
>
>
> Our set up and configuration are as follow:
>
> ·         5 nodes cluster running on docker
>
> ·         Relevant memory config:
>
> jobmanager.memory.heap.size: 1600m
>
> taskmanager.memory.process.size: 231664m
>
> taskmanager.memory.network.fraction: 0.3
>
> taskmanager.memory.jvm-metaspace.size: 10g
>
> jobmanager.memory.jvm-metaspace.size: 2g
>
> taskmanager.memory.framework.off-heap.size: 1g
>
>
>
> ·         Host details
>
> max locked memory  (kbytes, -l) 65536
>
> max memory size       (kbytes, -m) unlimited
>
> open files                     (-n) 1024
>
> max user processes    (-u) 1547269
>
> virtual memory           (kbytes, -v) unlimited
>
> file locks                       (-x) unlimited
>
>
>
> cat /proc/sys/kernel/threads-max: 3094538
>
> kernel.pid_max = 57344
>
>
>
>
>
> We try to increase the max user processes, also to increase and decrease
> the jvm-metaspace.
>
>
>
> Should we keep increasing the max number of processes on the host, Is
> there a way to limit the number of threads from flink config?
>
>
>
> What should we do? Any insights?
> I can provide more information as needed.
>
>
>
> Thanks in advance
>
>
>
>  Ilan
>
>
>
>

Re: Unable to create new native thread error

Posted by Ilan Huchansky <il...@start.io>.
Hi David,

Thanks for your fast response.

Do you think that changing the submission method could solve the problem? Using the CLI instead of the REST API.

Another question, I see that the most critical issue (FLINK-25022) is in progress and should be released on with version 1.13.4 , do you know when this version is planned to be released?

Thanks again,
Ilan.

From: David Morávek <dm...@apache.org>
Date: Thursday, 2 December 2021 at 17:25
To: Ilan Huchansky <il...@start.io>
Cc: user@flink.apache.org <us...@flink.apache.org>, Start.io SDP <sd...@start.io>
Subject: Re: Unable to create new native thread error
Hi Ilan,

we are aware of multiple issues when web-submission can result in classloader / thread local leaks, which could potentially result in the behavior you're describing. We're working on addressing them.

FLINK-25022 [1]: The most critical one leaking thread locals.
FLINK-25027 [2]: Is only a memory improvement for a particular situation (a lot of small batch jobs) and could be fixed by accounting for when setting Metaspace size.
FLINK-25023 [3]: Can leak the classloader of the first job submitted via rest API. (constant overhead for Metaspace)

In general, web-submission is different from a normal submission in way, that the "main method" of the uploaded jar is executed on JobManager and it's really hard to isolate it's execution from possible side effects.

[1] https://issues.apache.org/jira/browse/FLINK-25022
[2] https://issues.apache.org/jira/browse/FLINK-25027
[3] https://issues.apache.org/jira/browse/FLINK-25023

Best,
D.

On Thu, Dec 2, 2021 at 3:51 PM Ilan Huchansky <il...@start.io>> wrote:
Hi Flink mailing list,

I am Ilan from Start.io data platform team, need some guidance.

We have a flow with the following use case:


  *   We read files from AWS S3 buckets process them on our cluster and sink the data into files using Flink file sink.
  *   The jobs use always the same jar, we uploaded it to every job manager on the cluster.
  *   We are submitting jobs constantly through the REST API.
  *   Each job reads one or more files from S3.
  *   The jobs can run from 20 seconds up to 3.5 hours.
  *   The jobs run on batch mode
  *   Running flink 1.13.1
  *   We are running in cluster mode using docker, same machines are being used for task and job manager.

 We are struggling with the same error, over and over again. We encounter it in the job manager and in the task manager.

After a while that the cluster is running and jobs are finishing correctly the task and job manager fail to operate due to:
Caused by: java.lang.OutOfMemoryError: unable to create new native thread.


We also see some sporadic failure of java.lang.NoClassDefFoundError, not sure it is related.

Our set up and configuration are as follow:
*         5 nodes cluster running on docker
*         Relevant memory config:
jobmanager.memory.heap.size: 1600m
taskmanager.memory.process.size: 231664m
taskmanager.memory.network.fraction: 0.3
taskmanager.memory.jvm-metaspace.size: 10g
jobmanager.memory.jvm-metaspace.size: 2g
taskmanager.memory.framework.off-heap.size: 1g

*         Host details
max locked memory  (kbytes, -l) 65536
max memory size       (kbytes, -m) unlimited
open files                     (-n) 1024
max user processes    (-u) 1547269
virtual memory           (kbytes, -v) unlimited
file locks                       (-x) unlimited

cat /proc/sys/kernel/threads-max: 3094538
kernel.pid_max = 57344


We try to increase the max user processes, also to increase and decrease the jvm-metaspace.

Should we keep increasing the max number of processes on the host, Is there a way to limit the number of threads from flink config?

What should we do? Any insights?
I can provide more information as needed.

Thanks in advance

 Ilan


Re: Unable to create new native thread error

Posted by David Morávek <dm...@apache.org>.
Hi Ilan,

we are aware of multiple issues when web-submission can result in
classloader / thread local leaks, which could potentially result in the
behavior you're describing. We're working on addressing them.

FLINK-25022 [1]: The most critical one leaking thread locals.
FLINK-25027 [2]: Is only a memory improvement for a particular situation (a
lot of small batch jobs) and could be fixed by accounting for when setting
Metaspace size.
FLINK-25023 [3]: Can leak the classloader of the first job submitted via
rest API. (constant overhead for Metaspace)

In general, web-submission is different from a normal submission in way,
that the "main method" of the uploaded jar is executed on JobManager and
it's really hard to isolate it's execution from possible side effects.

[1] https://issues.apache.org/jira/browse/FLINK-25022
[2] https://issues.apache.org/jira/browse/FLINK-25027
[3] https://issues.apache.org/jira/browse/FLINK-25023

Best,
D.

On Thu, Dec 2, 2021 at 3:51 PM Ilan Huchansky <il...@start.io>
wrote:

> *Hi Flink mailing list,*
>
>
>
> I am Ilan from Start.io data platform team, need some guidance.
>
>
>
> We have a flow with the following use case:
>
>
>
>    - We read files from AWS S3 buckets process them on our cluster and
>    sink the data into files using Flink file sink.
>    - The jobs use always the same jar, we uploaded it to every job
>    manager on the cluster.
>    - We are submitting jobs constantly through the REST API.
>    - Each job reads one or more files from S3.
>    - The jobs can run from 20 seconds up to 3.5 hours.
>    - The jobs run on batch mode
>    - Running flink 1.13.1
>    - We are running in cluster mode using docker, same machines are being
>    used for task and job manager.
>
>
>
>  We are struggling with the same error, over and over again. We encounter
> it in the job manager and in the task manager.
>
>
>
> After a while that the cluster is running and jobs are finishing correctly
> the task and job manager fail to operate due to:
>
> Caused by: java.lang.OutOfMemoryError: unable to create new native thread.
>
>
>
>
>
> We also see some sporadic failure of java.lang.NoClassDefFoundError, not
> sure it is related.
>
>
>
> Our set up and configuration are as follow:
>
> ·         5 nodes cluster running on docker
>
> ·         Relevant memory config:
>
> jobmanager.memory.heap.size: 1600m
>
> taskmanager.memory.process.size: 231664m
>
> taskmanager.memory.network.fraction: 0.3
>
> taskmanager.memory.jvm-metaspace.size: 10g
>
> jobmanager.memory.jvm-metaspace.size: 2g
>
> taskmanager.memory.framework.off-heap.size: 1g
>
>
>
> ·         Host details
>
> max locked memory  (kbytes, -l) 65536
>
> max memory size       (kbytes, -m) unlimited
>
> open files                     (-n) 1024
>
> max user processes    (-u) 1547269
>
> virtual memory           (kbytes, -v) unlimited
>
> file locks                       (-x) unlimited
>
>
>
> cat /proc/sys/kernel/threads-max: 3094538
>
> kernel.pid_max = 57344
>
>
>
>
>
> We try to increase the max user processes, also to increase and decrease
> the jvm-metaspace.
>
>
>
> Should we keep increasing the max number of processes on the host, Is
> there a way to limit the number of threads from flink config?
>
>
>
> What should we do? Any insights?
> I can provide more information as needed.
>
>
>
> Thanks in advance
>
>
>
>  Ilan
>
>
>