You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by tom yang <en...@gmail.com> on 2023/03/28 19:03:33 UTC

python udf out of memory

Hi,

I am running a standalone cluster setup and submit flinksql job with python
udf following the examples here

<https://github.com/ververica/flink-sql-cookbook/blob/main/udfs/01_python_udfs/01_python_udfs.md>
github.com/ververica/flink-sql-cookbook/blob/main/udfs/01_python_udfs/01_python_udfs.md

I notice that each time I submit the job, cancel and resubmit, eventually
my task manager will throw an out of memory exception. I am sure it is due
to a leaky class loader somewhere but I am not sure how to track it down.
Has anyone experienced this issue before?


2023-03-24 04:55:46,380 ERROR
org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error
occurred while executing the TaskManager. Shutting it down...
java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
has occurred. This can mean two things: either the job requires a larger
size of JVM metaspace to load classes or there is a class loading leak. In
the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
should be increased. If the error persists (usually in cluster after
several job (re-)submissions) then there is probably a class loading leak
in user code or some of its dependencies which has to be investigated and
fixed. The task executor has to be shutdown... at
java.lang.ClassLoader.defineClass1(Native Method) ~[?:?] at
java.lang.ClassLoader.defineClass(ClassLoader.java:1017) ~[?:?] at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
~[?:?] at java.net.URLClassLoader.defineClass(URLClassLoader.java:555)
~[?:?] at java.net.URLClassLoader$1.run(URLClassLoader.java:458) ~[?:?] at
java.net.URLClassLoader$1.run(URLClassLoader.java:452) ~[?:?] at
java.security.AccessController.doPrivileged(Native Method) ~[?:?] at
java.net.URLClassLoader.findClass(URLClassLoader.java:451) ~[?:?] at
java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?] at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
~[dpi-flink-sql-base-app-0.9.35.jar:?] at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
[dpi-flink-sql-base-app-0.9.35.jar:?] at
java.lang.ClassLoader.loadClass(ClassLoader.java:522) [?:?] at
org.apache.beam.sdk.options.PipelineOptionsFactory.<clinit>(PipelineOptionsFactory.java:500)
[blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:238)
[blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
at
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
[blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
at
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
[blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
at
org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator.open(PythonTableFunctionOperator.java:114)
[blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
[flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
[flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$927/0x0000000800a4ac40.call(Unknown
Source) [flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
[flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
[flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
[flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.runtime.taskmanager.Task$$Lambda$815/0x0000000800904840.run(Unknown
Source) [flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
[flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
[flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
[flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
[flink-dist-1.16.1.jar:1.16.1] at java.lang.Thread.run(Thread.java:829)
[?:?]

Re: python udf out of memory

Posted by tom yang <en...@gmail.com>.
Hi Shammon,

Thank you for your suggestion, currently, the JVM metaspace is set to 3 GB,
but I think it's more indicative of an underlying issue. The user class
loader is leaking. Taking a dump of the task manager, I can see that the
classloader is leaking. Not exactly sure how I can drill down to the root
cause.

[image: image.png]
Thanks,

Tom

On Tue, Mar 28, 2023 at 5:23 PM Shammon FY <zj...@gmail.com> wrote:

> Hi tom
>
> Flink will create individual classloader for each job in task manager.
> When jobs are frequently started and stopped, the usage of memory in
> Metaspace will increase. I found out that your OOM was caused by metaspace.
> I think you can check the size of metaspace and try to increase the size by
> option `taskmanager.memory.jvm-metaspace.size`
>
> Best,
> Shammon FY
>
> On Wed, Mar 29, 2023 at 3:04 AM tom yang <en...@gmail.com> wrote:
>
>> Hi,
>>
>> I am running a standalone cluster setup and submit flinksql job with
>> python udf following the examples here
>>
>>
>> <https://github.com/ververica/flink-sql-cookbook/blob/main/udfs/01_python_udfs/01_python_udfs.md>
>> github.com/ververica/flink-sql-cookbook/blob/main/udfs/01_python_udfs/01_python_udfs.md
>>
>> I notice that each time I submit the job, cancel and resubmit, eventually
>> my task manager will throw an out of memory exception. I am sure it is due
>> to a leaky class loader somewhere but I am not sure how to track it down.
>> Has anyone experienced this issue before?
>>
>>
>> 2023-03-24 04:55:46,380 ERROR
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error
>> occurred while executing the TaskManager. Shutting it down...
>> java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
>> has occurred. This can mean two things: either the job requires a larger
>> size of JVM metaspace to load classes or there is a class loading leak. In
>> the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
>> should be increased. If the error persists (usually in cluster after
>> several job (re-)submissions) then there is probably a class loading leak
>> in user code or some of its dependencies which has to be investigated and
>> fixed. The task executor has to be shutdown... at
>> java.lang.ClassLoader.defineClass1(Native Method) ~[?:?] at
>> java.lang.ClassLoader.defineClass(ClassLoader.java:1017) ~[?:?] at
>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
>> ~[?:?] at java.net.URLClassLoader.defineClass(URLClassLoader.java:555)
>> ~[?:?] at java.net.URLClassLoader$1.run(URLClassLoader.java:458) ~[?:?] at
>> java.net.URLClassLoader$1.run(URLClassLoader.java:452) ~[?:?] at
>> java.security.AccessController.doPrivileged(Native Method) ~[?:?] at
>> java.net.URLClassLoader.findClass(URLClassLoader.java:451) ~[?:?] at
>> java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?] at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
>> ~[dpi-flink-sql-base-app-0.9.35.jar:?] at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
>> [dpi-flink-sql-base-app-0.9.35.jar:?] at
>> java.lang.ClassLoader.loadClass(ClassLoader.java:522) [?:?] at
>> org.apache.beam.sdk.options.PipelineOptionsFactory.<clinit>(PipelineOptionsFactory.java:500)
>> [blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
>> at
>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:238)
>> [blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
>> at
>> org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
>> [blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
>> at
>> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
>> [blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
>> at
>> org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator.open(PythonTableFunctionOperator.java:114)
>> [blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>> [flink-dist-1.16.1.jar:1.16.1] at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
>> [flink-dist-1.16.1.jar:1.16.1] at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$927/0x0000000800a4ac40.call(Unknown
>> Source) [flink-dist-1.16.1.jar:1.16.1] at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>> [flink-dist-1.16.1.jar:1.16.1] at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
>> [flink-dist-1.16.1.jar:1.16.1] at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
>> [flink-dist-1.16.1.jar:1.16.1] at
>> org.apache.flink.runtime.taskmanager.Task$$Lambda$815/0x0000000800904840.run(Unknown
>> Source) [flink-dist-1.16.1.jar:1.16.1] at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>> [flink-dist-1.16.1.jar:1.16.1] at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>> [flink-dist-1.16.1.jar:1.16.1] at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>> [flink-dist-1.16.1.jar:1.16.1] at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>> [flink-dist-1.16.1.jar:1.16.1] at java.lang.Thread.run(Thread.java:829)
>> [?:?]
>>
>

Re: python udf out of memory

Posted by Shammon FY <zj...@gmail.com>.
Hi tom

Flink will create individual classloader for each job in task manager. When
jobs are frequently started and stopped, the usage of memory in Metaspace
will increase. I found out that your OOM was caused by metaspace. I think
you can check the size of metaspace and try to increase the size by option
`taskmanager.memory.jvm-metaspace.size`

Best,
Shammon FY

On Wed, Mar 29, 2023 at 3:04 AM tom yang <en...@gmail.com> wrote:

> Hi,
>
> I am running a standalone cluster setup and submit flinksql job with
> python udf following the examples here
>
>
> <https://github.com/ververica/flink-sql-cookbook/blob/main/udfs/01_python_udfs/01_python_udfs.md>
> github.com/ververica/flink-sql-cookbook/blob/main/udfs/01_python_udfs/01_python_udfs.md
>
> I notice that each time I submit the job, cancel and resubmit, eventually
> my task manager will throw an out of memory exception. I am sure it is due
> to a leaky class loader somewhere but I am not sure how to track it down.
> Has anyone experienced this issue before?
>
>
> 2023-03-24 04:55:46,380 ERROR
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error
> occurred while executing the TaskManager. Shutting it down...
> java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
> has occurred. This can mean two things: either the job requires a larger
> size of JVM metaspace to load classes or there is a class loading leak. In
> the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
> should be increased. If the error persists (usually in cluster after
> several job (re-)submissions) then there is probably a class loading leak
> in user code or some of its dependencies which has to be investigated and
> fixed. The task executor has to be shutdown... at
> java.lang.ClassLoader.defineClass1(Native Method) ~[?:?] at
> java.lang.ClassLoader.defineClass(ClassLoader.java:1017) ~[?:?] at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
> ~[?:?] at java.net.URLClassLoader.defineClass(URLClassLoader.java:555)
> ~[?:?] at java.net.URLClassLoader$1.run(URLClassLoader.java:458) ~[?:?] at
> java.net.URLClassLoader$1.run(URLClassLoader.java:452) ~[?:?] at
> java.security.AccessController.doPrivileged(Native Method) ~[?:?] at
> java.net.URLClassLoader.findClass(URLClassLoader.java:451) ~[?:?] at
> java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?] at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
> ~[dpi-flink-sql-base-app-0.9.35.jar:?] at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
> [dpi-flink-sql-base-app-0.9.35.jar:?] at
> java.lang.ClassLoader.loadClass(ClassLoader.java:522) [?:?] at
> org.apache.beam.sdk.options.PipelineOptionsFactory.<clinit>(PipelineOptionsFactory.java:500)
> [blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:238)
> [blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
> at
> org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
> [blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
> at
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
> [blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
> at
> org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator.open(PythonTableFunctionOperator.java:114)
> [blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> [flink-dist-1.16.1.jar:1.16.1] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
> [flink-dist-1.16.1.jar:1.16.1] at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$927/0x0000000800a4ac40.call(Unknown
> Source) [flink-dist-1.16.1.jar:1.16.1] at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
> [flink-dist-1.16.1.jar:1.16.1] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
> [flink-dist-1.16.1.jar:1.16.1] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
> [flink-dist-1.16.1.jar:1.16.1] at
> org.apache.flink.runtime.taskmanager.Task$$Lambda$815/0x0000000800904840.run(Unknown
> Source) [flink-dist-1.16.1.jar:1.16.1] at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> [flink-dist-1.16.1.jar:1.16.1] at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> [flink-dist-1.16.1.jar:1.16.1] at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> [flink-dist-1.16.1.jar:1.16.1] at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> [flink-dist-1.16.1.jar:1.16.1] at java.lang.Thread.run(Thread.java:829)
> [?:?]
>