You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Juan Carlos Garcia <jc...@gmail.com> on 2019/01/17 14:50:15 UTC

Re: Classloader memory leak on job restart (FlinkRunner)

Nice finding, we are also experiencing the same (Flink 1.5.4)  where few
jobs are dying of OOM for the metaspace as well after multiple restart, in
our case we have
a HA flink cluster and not using YARN for orchestration.

Good job with the diagnosing .

JC

On Thu, Jan 17, 2019 at 3:23 PM Daniel Harper <Da...@bbc.co.uk>
wrote:

> Environment:
>
> BEAM 2.7.0
> Flink 1.5.2
> AWS EMR 5.17.0
> Hadoop YARN for orchestration
>
>
> We’ve noticed the metaspace usage increasing when our Flink job restarts,
> which in turn sometimes causes YARN to kill the container for going beyond
> its physical memory limits. After setting the MaxMetaspaceSize setting and
> making the JVM dump its heap on OOM, we noticed quite a few instances of
> the FlinkUserClassLoader class hanging around, which corresponded with the
> number of restarts that happened.
>
> Originally I posted this issue on the FLINK mailing list here
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/User-ClassLoader-leak-on-job-restart-td25547.html
>
>
>
> After investigation I think this is related to something in the BEAM code,
> or the way BEAM interacts with the Flink class loading mechanism, because I
> can see the following when selecting one of the ‘old’ classloaders -> Path
> to GC Roots using Eclipse MAT in one of the heap dumps
>
>
>
>
>
> This looks to me like this issue
> https://github.com/FasterXML/jackson-databind/issues/1363
>
>
> It sounds like to resolve it, user code should call
> TypeFactory.defaultInstance().clearCache() when threads are shutdown. I’m
> not sure where in the FlinkRunner codebase this should be though
>
>
> To try and narrow it down as much as possible/reduce the number of
> dependencies I’ve managed to reproduce this with a really really simple job
> that just reads from a synthetic unbounded source (back-ported from the
> master branch) and does nothing https://github.com/djhworld/streaming-job,
> this will run on a Flink environment.
>
> To reproduce the OOM I just ran the job with MaxMetaspaceSize=125M, and
> then killed a random task manager every 60 seconds, which yielded the
> following
>
>
>
> As you can see the number of classes increases on each restart, which
> causes the metaspace to increase and eventually cause an OOM.
>
> Is there anything we could do to fix this? I’ve not tested this on > 2.7.0
> because we are waiting for 2.10 to drop so we can run Flink 1.6/1.7 on EMR
>
> With thanks,
>
> Daniel
>
>
>
>
>
>
>
>
> ----------------------------
>
> http://www.bbc.co.uk
> This e-mail (and any attachments) is confidential and may contain personal
> views which are not the views of the BBC unless specifically stated.
> If you have received it in error, please delete it from your system.
> Do not use, copy or disclose the information in any way nor act in
> reliance on it and notify the sender immediately.
> Please note that the BBC monitors e-mails sent or received.
> Further communication will signify your consent to this.
>
> ---------------------
>


-- 

JC

Re: Classloader memory leak on job restart (FlinkRunner)

Posted by Daniel Harper <Da...@bbc.co.uk>.
Hi Maximilian,

Thanks for your help on this and merging the PR, I definitely understand
our setup is off the normal path (i.e. Using the parent first class
loader), and it’s definitely appreciated! :)

Not sure why you wasn’t able to reproduce the problem using that simple
job, maybe as you mentioned later - the behaviour stems from the way YARN
includes JARS, rather than using Flink directly.

We will take a look at tuning the YARN parameters to add further
mitigations for our issues, but we’re definitely looking forward to when
2.10 drops - hopefully EMR will support Flink 1.7 too!

Daniel











On 21/01/2019, 23:48, "Maximilian Michels" <mx...@apache.org> wrote:

>Thanks again for providing the project to reproduce the error.
>Unfortunately, I
>could not reproduce the error with your instructions. I've tried killing
>TaskManagers with the restricted MetaSpace memory but I did not run out
>of
>MetaSpace memory in the remaining TaskManager. I did not use YARN.
>
>I saw your bug report in FLINK [1]. Some comments on that:
>
>First of all your initial issue is unrelated to Beam but caused by the
>Metaspace
>growing and Yarn killing your application [2]. The Metaspace grows
>because Flink
>does not limit it by default and when a job is restarted it needs to
>temporarily
>increase the MetaSpace size. Limiting it should force a GC instead. Read
>on to
>find out why GC might not work.
>
>You mentioned in the first comment of [1] that you copied the job jar
>into the
>lib folder. This is problematic with "parent-first" class loading because
>now
>libraries like Jackson in your job will be loaded from the parent
>classloader.
>This can then install the Flink Job classloader into the Jackson cache.
>The
>cache and Jackson will never be evicted as they are part of the parent
>classloader. Note, that this would not be a problem if class loading was
>"child-first" and Jackson is part of the user jar.
>
>However,
>
>1) The YARN "cluster" mode does seem to have a problem because it
>includes the
>jars into the Flink class loader. So really "child-first" vs
>"parent-first" does
>not matter because it will always fall back to the parent/Flink class
>loader.
>
>2) You might want to restart the entire YARN cluster on failures in the
>YARN
>"cluster" mode (see yarn.maximum-failed-containers), or use the "session"
>mode
>with "child-first".
>
>3) If the above is not feasible, you have to take care of cleaning the
>Jackson
>cache or any other static cache, like here:
>https://github.com/apache/beam/pull/7552
>
>I think the subtlety of this behavior warrants that we also merge the fix
>in
>Beam, as we have seen that clearing the cache gets rid of the problem.
>
>Cheers,
>Max
>
>[1] https://issues.apache.org/jira/browse/FLINK-11205
>[2] https://issues.apache.org/jira/browse/FLINK-10317
>
>On 21.01.19 03:46, Daniel Harper wrote:
>> "classloader.resolve-order” is set to ‘parent-first’
>>
>>
>> In the simple job that I wrote, the mvn dependency:tree yields the
>> following. It looks like beam-sdks-java-core includes jackson-databind,
>> and the flink runtime lib shades it
>>
>>
>> [INFO] Scanning for projects...
>> [WARNING]
>> [WARNING] Some problems were encountered while building the effective
>> model for example:streaming-job:jar:1.0-SNAPSHOT
>> [WARNING] 'build.plugins.plugin.version' for
>> org.apache.maven.plugins:maven-jar-plugin is missing. @ line 60, column
>>21
>> [WARNING]
>> [WARNING] It is highly recommended to fix these problems because they
>> threaten the stability of your build.
>> [WARNING]
>> [WARNING] For this reason, future Maven versions might no longer support
>> building such malformed projects.
>> [WARNING]
>> [INFO]
>> [INFO] -----------------------< example:streaming-job
>>> ------------------------
>> [INFO] Building streaming-job 1.0-SNAPSHOT
>> [INFO] --------------------------------[ jar
>> ]---------------------------------
>> [INFO]
>> [INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @
>>streaming-job
>> ---
>> [INFO] example:streaming-job:jar:1.0-SNAPSHOT
>> [INFO] +- org.apache.beam:beam-sdks-java-core:jar:2.7.0:compile
>> [INFO] |  +- com.fasterxml.jackson.core:jackson-core:jar:2.9.5:compile
>> [INFO] |  +-
>> com.fasterxml.jackson.core:jackson-annotations:jar:2.9.5:compile
>> [INFO] |  +-
>>com.fasterxml.jackson.core:jackson-databind:jar:2.9.5:compile
>> [INFO] |  +- org.slf4j:slf4j-api:jar:1.7.25:compile
>> [INFO] |  +- org.apache.avro:avro:jar:1.8.2:compile
>> [INFO] |  |  +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile
>> [INFO] |  |  +-
>>org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile
>> [INFO] |  |  \- com.thoughtworks.paranamer:paranamer:jar:2.7:compile
>> [INFO] |  +- org.xerial.snappy:snappy-java:jar:1.1.4:compile
>> [INFO] |  +- joda-time:joda-time:jar:2.4:compile
>> [INFO] |  \- org.tukaani:xz:jar:1.8:compile
>> [INFO] \- org.apache.beam:beam-runners-flink_2.11:jar:2.7.0:compile
>> [INFO]    +- org.apache.beam:beam-runners-core-java:jar:2.7.0:compile
>> [INFO]    |  +- org.apache.beam:beam-model-pipeline:jar:2.7.0:compile
>> [INFO]    |  |  \-
>> com.google.errorprone:error_prone_annotations:jar:2.1.2:compile
>> [INFO]    |  \-
>>org.apache.beam:beam-model-fn-execution:jar:2.7.0:compile
>> [INFO]    +-
>> org.apache.beam:beam-runners-core-construction-java:jar:2.7.0:compile
>> [INFO]    |  \-
>>org.apache.beam:beam-model-job-management:jar:2.7.0:compile
>> [INFO]    +-
>> org.apache.beam:beam-runners-java-fn-execution:jar:2.7.0:compile
>> [INFO]    |  +-
>> org.apache.beam:beam-sdks-java-fn-execution:jar:2.7.0:compile
>> [INFO]    |  \-
>>
>>org.apache.beam:beam-vendor-sdks-java-extensions-protobuf:jar:2.7.0:compi
>>le
>> [INFO]    +- org.slf4j:slf4j-simple:jar:1.7.25:compile
>> [INFO]    +- org.apache.commons:commons-compress:jar:1.16.1:compile
>> [INFO]    |  \- org.objenesis:objenesis:jar:2.6:compile
>> [INFO]    +- args4j:args4j:jar:2.33:compile
>> [INFO]    +- org.apache.flink:flink-clients_2.11:jar:1.5.2:compile
>> [INFO]    |  +- org.apache.flink:flink-optimizer_2.11:jar:1.5.2:compile
>> [INFO]    |  +- commons-cli:commons-cli:jar:1.3.1:compile
>> [INFO]    |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>> [INFO]    |  \- org.apache.flink:force-shading:jar:1.5.2:compile
>> [INFO]    +- org.apache.flink:flink-core:jar:1.5.2:compile
>> [INFO]    |  +- org.apache.flink:flink-annotations:jar:1.5.2:compile
>> [INFO]    |  +- org.apache.flink:flink-shaded-asm:jar:5.0.4-2.0:compile
>> [INFO]    |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
>> [INFO]    |  +- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
>> [INFO]    |  |  \- com.esotericsoftware.minlog:minlog:jar:1.2:compile
>> [INFO]    |  \-
>>commons-collections:commons-collections:jar:3.2.2:compile
>> [INFO]    +- org.apache.flink:flink-metrics-core:jar:1.5.2:compile
>> [INFO]    +- org.apache.flink:flink-java:jar:1.5.2:compile
>> [INFO]    |  \- org.apache.commons:commons-math3:jar:3.5:compile
>> [INFO]    +- org.apache.flink:flink-runtime_2.11:jar:1.5.2:compile
>> [INFO]    |  +-
>>
>>org.apache.flink:flink-queryable-state-client-java_2.11:jar:1.5.2:compile
>> [INFO]    |  +- org.apache.flink:flink-hadoop-fs:jar:1.5.2:compile
>> [INFO]    |  +- commons-io:commons-io:jar:2.4:compile
>> [INFO]    |  +-
>> org.apache.flink:flink-shaded-netty:jar:4.0.27.Final-2.0:compile
>> [INFO]    |  +- org.apache.flink:flink-shaded-guava:jar:18.0-2.0:compile
>> [INFO]    |  +-
>>org.apache.flink:flink-shaded-jackson:jar:2.7.9-3.0:compile
>> [INFO]    |  +- org.javassist:javassist:jar:3.18.2-GA:compile
>> [INFO]    |  +- org.scala-lang:scala-library:jar:2.11.12:compile
>> [INFO]    |  +- com.typesafe.akka:akka-actor_2.11:jar:2.4.20:compile
>> [INFO]    |  |  +- com.typesafe:config:jar:1.3.0:compile
>> [INFO]    |  |  \-
>> org.scala-lang.modules:scala-java8-compat_2.11:jar:0.7.0:compile
>> [INFO]    |  +- com.typesafe.akka:akka-stream_2.11:jar:2.4.20:compile
>> [INFO]    |  |  +-
>>org.reactivestreams:reactive-streams:jar:1.0.0:compile
>> [INFO]    |  |  \- com.typesafe:ssl-config-core_2.11:jar:0.2.1:compile
>> [INFO]    |  |     \-
>> org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4:compile
>> [INFO]    |  +- com.typesafe.akka:akka-protobuf_2.11:jar:2.4.20:compile
>> [INFO]    |  +- com.typesafe.akka:akka-slf4j_2.11:jar:2.4.20:compile
>> [INFO]    |  +- org.clapper:grizzled-slf4j_2.11:jar:1.0.2:compile
>> [INFO]    |  +- com.github.scopt:scopt_2.11:jar:3.5.0:compile
>> [INFO]    |  \- com.twitter:chill_2.11:jar:0.7.4:compile
>> [INFO]    |     \- com.twitter:chill-java:jar:0.7.4:compile
>> [INFO]    \-
>>org.apache.flink:flink-streaming-java_2.11:jar:1.5.2:compile
>> [INFO]
>> ------------------------------------------------------------------------
>> [INFO] BUILD SUCCESS
>> [INFO]
>> ------------------------------------------------------------------------
>> [INFO] Total time:  0.928 s
>> [INFO] Finished at: 2019-01-21T08:45:08Z
>> [INFO]
>> ------------------------------------------------------------------------
>>
>>
>>
>>
>>
>> On 18/01/2019, 17:01, "Maximilian Michels" <mx...@apache.org> wrote:
>>
>>> Hi Daniel,
>>>
>>> I did some more debugging. I think the fix we proposed only cures the
>>> symptoms.
>>> The cause is that your job uses Jackson which is also a dependency of
>>> Flink.
>>>
>>> So your job ends up using Flink's version of Jackson which then
>>>installs
>>> classes
>>>from your job in the Jackson cache. Now, this wouldn't be a problem, if
>>> you
>>> shaded your version of Jackson, i.e. renamed the Jackson package.
>>>
>>> But even without shading, the default behavior of Flink is to load user
>>> classes
>>> first. Could you please check:
>>>
>>> 1) Is "classloader.resolve-order" set to "child-first"?
>>> 2) Do you include the Jackson library in your user jar?
>>>
>>> Thanks,
>>> Max
>>>
>>> On 18.01.19 03:40, Daniel Harper wrote:
>>>> Thanks for raising this and the PR!
>>>>
>>>> In our production streaming job we’re using Kinesis, so good shout on
>>>> the
>>>> UnboundedSupportWrapper.
>>>>
>>>> On 17/01/2019, 21:08, "Maximilian Michels" <mx...@apache.org> wrote:
>>>>
>>>>> I'm glad that solved your GC problem. I think dipose() is a good
>>>>>place,
>>>>> it is
>>>>> meant for cleanup.
>>>>>
>>>>> In your case the DoFn is a NOOP, so the PipelineOptions are probably
>>>>> loaded
>>>>> through your UnboundedSource. If both happen to be scheduled in the
>>>>> same
>>>>> TaskManager that is fine. However, just for precaution we should also
>>>>> include
>>>>> the cache invalidation in UnboundedSourceWrapper.
>>>>>
>>>>> This way we should be good for the streaming execution. Will try to
>>>>>get
>>>>> this
>>>>> into 2.10.0.
>>>>>
>>>>> Thanks,
>>>>> Max
>>>>>
>>>>> Issue: https://jira.apache.org/jira/browse/BEAM-6460
>>>>>
>>>>> On 17.01.19 12:50, Daniel Harper wrote:
>>>>>> Max, Juan,
>>>>>>
>>>>>> Just tried patching this class
>>>>>>
>>>>>>
>>>>>>
>>>>>>https://github.com/apache/beam/blob/v2.7.0/runners/flink/src/main/jav
>>>>>>a/
>>>>>> or
>>>>>> g/
>>>>>>
>>>>>>
>>>>>>
>>>>>>apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator
>>>>>>.j
>>>>>> av
>>>>>> a#
>>>>>> L389 and putting the clearCache call in the finally block.
>>>>>>
>>>>>> Redoing the test causes the GC to kick in (see screenshot)
>>>>>>
>>>>>> I¹m not sure if this is the best place to put this clean up code
>>>>>> though,
>>>>>> is this the final place where all BEAM related stuff get terminated?
>>>>>>
>>>>>> Daniel.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 17/01/2019, 16:18, "Maximilian Michels" <mx...@apache.org> wrote:
>>>>>>
>>>>>>> Hi Daniel, hi Juan,
>>>>>>>
>>>>>>> @Daniel Thanks a lot for investigating and reporting the issue.
>>>>>>>
>>>>>>> Your analysis looks convincing, it may be that Jackson is holding
>>>>>>>on
>>>>>>> to
>>>>>>> the
>>>>>>> Classloader. Beam uses Jackson to parse the FlinkPipelineOptions.
>>>>>>>
>>>>>>> Have you already tried to call
>>>>>>> TypeFactory.defaultInstance().clearCache()
>>>>>>> in a
>>>>>>> catch-all block within your synthetic Beam job, before actually
>>>>>>> failing?
>>>>>>> That
>>>>>>> way we could see if the classloader is garbage-collected after a
>>>>>>> restart.
>>>>>>>
>>>>>>> Let me also investigate in the meantime. We are in the progress of
>>>>>>> getting the
>>>>>>> 2.10.0 release ready with a few pending issues. So it would be a
>>>>>>>good
>>>>>>> time to
>>>>>>> fix this issue.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Max
>>>>>>>
>>>>>>> On 17.01.19 09:50, Juan Carlos Garcia wrote:
>>>>>>>> Nice finding, we are also experiencing the same (Flink 1.5.4)
>>>>>>>>where
>>>>>>>> few jobs
>>>>>>>> are dying of OOM for the metaspace as well after multiple restart,
>>>>>>>> in
>>>>>>>> our case
>>>>>>>> we have
>>>>>>>> a HA flink cluster and not using YARN for orchestration.
>>>>>>>>
>>>>>>>> Good job with the diagnosing .
>>>>>>>>
>>>>>>>> JC
>>>>>>>>
>>>>>>>> On Thu, Jan 17, 2019 at 3:23 PM Daniel Harper
>>>>>>>> <Daniel.Harper@bbc.co.uk
>>>>>>>> <ma...@bbc.co.uk>> wrote:
>>>>>>>>
>>>>>>>>        Environment:
>>>>>>>>
>>>>>>>>        BEAM 2.7.0
>>>>>>>>        Flink 1.5.2
>>>>>>>>        AWS EMR 5.17.0
>>>>>>>>        Hadoop YARN for orchestration
>>>>>>>>
>>>>>>>>
>>>>>>>>        We¹ve noticed the metaspace usage increasing when our Flink
>>>>>>>> job
>>>>>>>> restarts,
>>>>>>>>        which in turn sometimes causes YARN to kill the container
>>>>>>>>for
>>>>>>>> going
>>>>>>>> beyond
>>>>>>>>        its physical memory limits. After setting the
>>>>>>>>MaxMetaspaceSize
>>>>>>>> setting and
>>>>>>>>        making the JVM dump its heap on OOM, we noticed quite a few
>>>>>>>> instances of the
>>>>>>>>        FlinkUserClassLoader class hanging around, which
>>>>>>>>corresponded
>>>>>>>> with
>>>>>>>> the
>>>>>>>>        number of restarts that happened.
>>>>>>>>
>>>>>>>>        Originally I posted this issue on the FLINK mailing list
>>>>>>>>here
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com
>>>>>>>>/U
>>>>>>>> se
>>>>>>>> r-
>>>>>>>> ClassLoader-leak-on-job-restart-td25547.html
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>        After investigation I think this is related to something in
>>>>>>>> the
>>>>>>>> BEAM code,
>>>>>>>>        or the way BEAM interacts with the Flink class loading
>>>>>>>> mechanism,
>>>>>>>> because I
>>>>>>>>        can see the following when selecting one of the Œold¹
>>>>>>>> classloaders
>>>>>>>> -> Path
>>>>>>>>        to GC Roots using Eclipse MAT in one of the heap dumps
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>        This looks to me like this issue
>>>>>>>>        https://github.com/FasterXML/jackson-databind/issues/1363
>>>>>>>>
>>>>>>>>
>>>>>>>>        It sounds like to resolve it, user code should call
>>>>>>>>        TypeFactory.defaultInstance().clearCache()when threads are
>>>>>>>> shutdown. I¹m not
>>>>>>>>        sure where in the FlinkRunner codebase this should be
>>>>>>>>though
>>>>>>>>
>>>>>>>>
>>>>>>>>        To try and narrow it down as much as possible/reduce the
>>>>>>>> number
>>>>>>>> of
>>>>>>>>        dependencies I¹ve managed to reproduce this with a really
>>>>>>>> really
>>>>>>>> simple job
>>>>>>>>        that just reads from a synthetic unbounded source
>>>>>>>>(back-ported
>>>>>>>> from
>>>>>>>> the
>>>>>>>>        master branch) and does nothing
>>>>>>>> https://github.com/djhworld/streaming-job,
>>>>>>>>        this will run on a Flink environment.
>>>>>>>>
>>>>>>>>        To reproduce the OOM I just ran the job with
>>>>>>>> MaxMetaspaceSize=125M,
>>>>>>>> and then
>>>>>>>>        killed a random task manager every 60 seconds, which
>>>>>>>>yielded
>>>>>>>> the
>>>>>>>> following
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>        As you can see the number of classes increases on each
>>>>>>>> restart,
>>>>>>>> which causes
>>>>>>>>        the metaspace to increase and eventually cause an OOM.
>>>>>>>>
>>>>>>>>        Is there anything we could do to fix this? I¹ve not tested
>>>>>>>> this
>>>>>>>> on
>>>>>>>>> 2.7.0
>>>>>>>>        because we are waiting for 2.10 to drop so we can run Flink
>>>>>>>> 1.6/1.7
>>>>>>>> on EMR
>>>>>>>>
>>>>>>>>        With thanks,
>>>>>>>>
>>>>>>>>        Daniel
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>        ----------------------------
>>>>>>>>
>>>>>>>>        http://www.bbc.co.uk <http://www.bbc.co.uk>
>>>>>>>>        This e-mail (and any attachments) is confidential and may
>>>>>>>> contain
>>>>>>>> personal
>>>>>>>>        views which are not the views of the BBC unless
>>>>>>>>specifically
>>>>>>>> stated.
>>>>>>>>        If you have received it in error, please delete it from
>>>>>>>>your
>>>>>>>> system.
>>>>>>>>        Do not use, copy or disclose the information in any way nor
>>>>>>>> act
>>>>>>>> in
>>>>>>>> reliance
>>>>>>>>        on it and notify the sender immediately.
>>>>>>>>        Please note that the BBC monitors e-mails sent or received.
>>>>>>>>        Further communication will signify your consent to this.
>>>>>>>>
>>>>>>>>        ---------------------
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> JC
>>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> -----------------------------
>>>>>> http://www.bbc.co.uk
>>>>>> This e-mail (and any attachments) is confidential and
>>>>>> may contain personal views which are not the views of the BBC unless
>>>>>> specifically stated.
>>>>>> If you have received it in
>>>>>> error, please delete it from your system.
>>>>>> Do not use, copy or disclose the
>>>>>> information in any way nor act in reliance on it and notify the
>>>>>>sender
>>>>>> immediately.
>>>>>> Please note that the BBC monitors e-mails
>>>>>> sent or received.
>>>>>> Further communication will signify your consent to
>>>>>> this.
>>>>>> -----------------------------
>>>>>>
>>>>
>>>>
>>>>
>>>> -----------------------------
>>>> http://www.bbc.co.uk
>>>> This e-mail (and any attachments) is confidential and
>>>> may contain personal views which are not the views of the BBC unless
>>>> specifically stated.
>>>> If you have received it in
>>>> error, please delete it from your system.
>>>> Do not use, copy or disclose the
>>>> information in any way nor act in reliance on it and notify the sender
>>>> immediately.
>>>> Please note that the BBC monitors e-mails
>>>> sent or received.
>>>> Further communication will signify your consent to
>>>> this.
>>>> -----------------------------
>>>>
>>
>>
>>
>> -----------------------------
>> http://www.bbc.co.uk
>> This e-mail (and any attachments) is confidential and
>> may contain personal views which are not the views of the BBC unless
>>specifically stated.
>> If you have received it in
>> error, please delete it from your system.
>> Do not use, copy or disclose the
>> information in any way nor act in reliance on it and notify the sender
>> immediately.
>> Please note that the BBC monitors e-mails
>> sent or received.
>> Further communication will signify your consent to
>> this.
>> -----------------------------
>>



-----------------------------
http://www.bbc.co.uk
This e-mail (and any attachments) is confidential and
may contain personal views which are not the views of the BBC unless specifically stated.
If you have received it in
error, please delete it from your system.
Do not use, copy or disclose the
information in any way nor act in reliance on it and notify the sender
immediately.
Please note that the BBC monitors e-mails
sent or received.
Further communication will signify your consent to
this.
-----------------------------

Re: Classloader memory leak on job restart (FlinkRunner)

Posted by Maximilian Michels <mx...@apache.org>.
Thanks again for providing the project to reproduce the error. Unfortunately, I 
could not reproduce the error with your instructions. I've tried killing 
TaskManagers with the restricted MetaSpace memory but I did not run out of 
MetaSpace memory in the remaining TaskManager. I did not use YARN.

I saw your bug report in FLINK [1]. Some comments on that:

First of all your initial issue is unrelated to Beam but caused by the Metaspace 
growing and Yarn killing your application [2]. The Metaspace grows because Flink 
does not limit it by default and when a job is restarted it needs to temporarily 
increase the MetaSpace size. Limiting it should force a GC instead. Read on to 
find out why GC might not work.

You mentioned in the first comment of [1] that you copied the job jar into the 
lib folder. This is problematic with "parent-first" class loading because now 
libraries like Jackson in your job will be loaded from the parent classloader. 
This can then install the Flink Job classloader into the Jackson cache. The 
cache and Jackson will never be evicted as they are part of the parent 
classloader. Note, that this would not be a problem if class loading was 
"child-first" and Jackson is part of the user jar.

However,

1) The YARN "cluster" mode does seem to have a problem because it includes the 
jars into the Flink class loader. So really "child-first" vs "parent-first" does 
not matter because it will always fall back to the parent/Flink class loader.

2) You might want to restart the entire YARN cluster on failures in the YARN 
"cluster" mode (see yarn.maximum-failed-containers), or use the "session" mode 
with "child-first".

3) If the above is not feasible, you have to take care of cleaning the Jackson 
cache or any other static cache, like here: https://github.com/apache/beam/pull/7552

I think the subtlety of this behavior warrants that we also merge the fix in 
Beam, as we have seen that clearing the cache gets rid of the problem.

Cheers,
Max

[1] https://issues.apache.org/jira/browse/FLINK-11205
[2] https://issues.apache.org/jira/browse/FLINK-10317

On 21.01.19 03:46, Daniel Harper wrote:
> "classloader.resolve-order” is set to ‘parent-first’
> 
> 
> In the simple job that I wrote, the mvn dependency:tree yields the
> following. It looks like beam-sdks-java-core includes jackson-databind,
> and the flink runtime lib shades it
> 
> 
> [INFO] Scanning for projects...
> [WARNING]
> [WARNING] Some problems were encountered while building the effective
> model for example:streaming-job:jar:1.0-SNAPSHOT
> [WARNING] 'build.plugins.plugin.version' for
> org.apache.maven.plugins:maven-jar-plugin is missing. @ line 60, column 21
> [WARNING]
> [WARNING] It is highly recommended to fix these problems because they
> threaten the stability of your build.
> [WARNING]
> [WARNING] For this reason, future Maven versions might no longer support
> building such malformed projects.
> [WARNING]
> [INFO]
> [INFO] -----------------------< example:streaming-job
>> ------------------------
> [INFO] Building streaming-job 1.0-SNAPSHOT
> [INFO] --------------------------------[ jar
> ]---------------------------------
> [INFO]
> [INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ streaming-job
> ---
> [INFO] example:streaming-job:jar:1.0-SNAPSHOT
> [INFO] +- org.apache.beam:beam-sdks-java-core:jar:2.7.0:compile
> [INFO] |  +- com.fasterxml.jackson.core:jackson-core:jar:2.9.5:compile
> [INFO] |  +-
> com.fasterxml.jackson.core:jackson-annotations:jar:2.9.5:compile
> [INFO] |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.9.5:compile
> [INFO] |  +- org.slf4j:slf4j-api:jar:1.7.25:compile
> [INFO] |  +- org.apache.avro:avro:jar:1.8.2:compile
> [INFO] |  |  +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile
> [INFO] |  |  +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile
> [INFO] |  |  \- com.thoughtworks.paranamer:paranamer:jar:2.7:compile
> [INFO] |  +- org.xerial.snappy:snappy-java:jar:1.1.4:compile
> [INFO] |  +- joda-time:joda-time:jar:2.4:compile
> [INFO] |  \- org.tukaani:xz:jar:1.8:compile
> [INFO] \- org.apache.beam:beam-runners-flink_2.11:jar:2.7.0:compile
> [INFO]    +- org.apache.beam:beam-runners-core-java:jar:2.7.0:compile
> [INFO]    |  +- org.apache.beam:beam-model-pipeline:jar:2.7.0:compile
> [INFO]    |  |  \-
> com.google.errorprone:error_prone_annotations:jar:2.1.2:compile
> [INFO]    |  \- org.apache.beam:beam-model-fn-execution:jar:2.7.0:compile
> [INFO]    +-
> org.apache.beam:beam-runners-core-construction-java:jar:2.7.0:compile
> [INFO]    |  \- org.apache.beam:beam-model-job-management:jar:2.7.0:compile
> [INFO]    +-
> org.apache.beam:beam-runners-java-fn-execution:jar:2.7.0:compile
> [INFO]    |  +-
> org.apache.beam:beam-sdks-java-fn-execution:jar:2.7.0:compile
> [INFO]    |  \-
> org.apache.beam:beam-vendor-sdks-java-extensions-protobuf:jar:2.7.0:compile
> [INFO]    +- org.slf4j:slf4j-simple:jar:1.7.25:compile
> [INFO]    +- org.apache.commons:commons-compress:jar:1.16.1:compile
> [INFO]    |  \- org.objenesis:objenesis:jar:2.6:compile
> [INFO]    +- args4j:args4j:jar:2.33:compile
> [INFO]    +- org.apache.flink:flink-clients_2.11:jar:1.5.2:compile
> [INFO]    |  +- org.apache.flink:flink-optimizer_2.11:jar:1.5.2:compile
> [INFO]    |  +- commons-cli:commons-cli:jar:1.3.1:compile
> [INFO]    |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
> [INFO]    |  \- org.apache.flink:force-shading:jar:1.5.2:compile
> [INFO]    +- org.apache.flink:flink-core:jar:1.5.2:compile
> [INFO]    |  +- org.apache.flink:flink-annotations:jar:1.5.2:compile
> [INFO]    |  +- org.apache.flink:flink-shaded-asm:jar:5.0.4-2.0:compile
> [INFO]    |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
> [INFO]    |  +- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
> [INFO]    |  |  \- com.esotericsoftware.minlog:minlog:jar:1.2:compile
> [INFO]    |  \- commons-collections:commons-collections:jar:3.2.2:compile
> [INFO]    +- org.apache.flink:flink-metrics-core:jar:1.5.2:compile
> [INFO]    +- org.apache.flink:flink-java:jar:1.5.2:compile
> [INFO]    |  \- org.apache.commons:commons-math3:jar:3.5:compile
> [INFO]    +- org.apache.flink:flink-runtime_2.11:jar:1.5.2:compile
> [INFO]    |  +-
> org.apache.flink:flink-queryable-state-client-java_2.11:jar:1.5.2:compile
> [INFO]    |  +- org.apache.flink:flink-hadoop-fs:jar:1.5.2:compile
> [INFO]    |  +- commons-io:commons-io:jar:2.4:compile
> [INFO]    |  +-
> org.apache.flink:flink-shaded-netty:jar:4.0.27.Final-2.0:compile
> [INFO]    |  +- org.apache.flink:flink-shaded-guava:jar:18.0-2.0:compile
> [INFO]    |  +- org.apache.flink:flink-shaded-jackson:jar:2.7.9-3.0:compile
> [INFO]    |  +- org.javassist:javassist:jar:3.18.2-GA:compile
> [INFO]    |  +- org.scala-lang:scala-library:jar:2.11.12:compile
> [INFO]    |  +- com.typesafe.akka:akka-actor_2.11:jar:2.4.20:compile
> [INFO]    |  |  +- com.typesafe:config:jar:1.3.0:compile
> [INFO]    |  |  \-
> org.scala-lang.modules:scala-java8-compat_2.11:jar:0.7.0:compile
> [INFO]    |  +- com.typesafe.akka:akka-stream_2.11:jar:2.4.20:compile
> [INFO]    |  |  +- org.reactivestreams:reactive-streams:jar:1.0.0:compile
> [INFO]    |  |  \- com.typesafe:ssl-config-core_2.11:jar:0.2.1:compile
> [INFO]    |  |     \-
> org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4:compile
> [INFO]    |  +- com.typesafe.akka:akka-protobuf_2.11:jar:2.4.20:compile
> [INFO]    |  +- com.typesafe.akka:akka-slf4j_2.11:jar:2.4.20:compile
> [INFO]    |  +- org.clapper:grizzled-slf4j_2.11:jar:1.0.2:compile
> [INFO]    |  +- com.github.scopt:scopt_2.11:jar:3.5.0:compile
> [INFO]    |  \- com.twitter:chill_2.11:jar:0.7.4:compile
> [INFO]    |     \- com.twitter:chill-java:jar:0.7.4:compile
> [INFO]    \- org.apache.flink:flink-streaming-java_2.11:jar:1.5.2:compile
> [INFO]
> ------------------------------------------------------------------------
> [INFO] BUILD SUCCESS
> [INFO]
> ------------------------------------------------------------------------
> [INFO] Total time:  0.928 s
> [INFO] Finished at: 2019-01-21T08:45:08Z
> [INFO]
> ------------------------------------------------------------------------
> 
> 
> 
> 
> 
> On 18/01/2019, 17:01, "Maximilian Michels" <mx...@apache.org> wrote:
> 
>> Hi Daniel,
>>
>> I did some more debugging. I think the fix we proposed only cures the
>> symptoms.
>> The cause is that your job uses Jackson which is also a dependency of
>> Flink.
>>
>> So your job ends up using Flink's version of Jackson which then installs
>> classes
>>from your job in the Jackson cache. Now, this wouldn't be a problem, if
>> you
>> shaded your version of Jackson, i.e. renamed the Jackson package.
>>
>> But even without shading, the default behavior of Flink is to load user
>> classes
>> first. Could you please check:
>>
>> 1) Is "classloader.resolve-order" set to "child-first"?
>> 2) Do you include the Jackson library in your user jar?
>>
>> Thanks,
>> Max
>>
>> On 18.01.19 03:40, Daniel Harper wrote:
>>> Thanks for raising this and the PR!
>>>
>>> In our production streaming job we’re using Kinesis, so good shout on
>>> the
>>> UnboundedSupportWrapper.
>>>
>>> On 17/01/2019, 21:08, "Maximilian Michels" <mx...@apache.org> wrote:
>>>
>>>> I'm glad that solved your GC problem. I think dipose() is a good place,
>>>> it is
>>>> meant for cleanup.
>>>>
>>>> In your case the DoFn is a NOOP, so the PipelineOptions are probably
>>>> loaded
>>>> through your UnboundedSource. If both happen to be scheduled in the
>>>> same
>>>> TaskManager that is fine. However, just for precaution we should also
>>>> include
>>>> the cache invalidation in UnboundedSourceWrapper.
>>>>
>>>> This way we should be good for the streaming execution. Will try to get
>>>> this
>>>> into 2.10.0.
>>>>
>>>> Thanks,
>>>> Max
>>>>
>>>> Issue: https://jira.apache.org/jira/browse/BEAM-6460
>>>>
>>>> On 17.01.19 12:50, Daniel Harper wrote:
>>>>> Max, Juan,
>>>>>
>>>>> Just tried patching this class
>>>>>
>>>>>
>>>>> https://github.com/apache/beam/blob/v2.7.0/runners/flink/src/main/java/
>>>>> or
>>>>> g/
>>>>>
>>>>>
>>>>> apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.j
>>>>> av
>>>>> a#
>>>>> L389 and putting the clearCache call in the finally block.
>>>>>
>>>>> Redoing the test causes the GC to kick in (see screenshot)
>>>>>
>>>>> I¹m not sure if this is the best place to put this clean up code
>>>>> though,
>>>>> is this the final place where all BEAM related stuff get terminated?
>>>>>
>>>>> Daniel.
>>>>>
>>>>>
>>>>>
>>>>> On 17/01/2019, 16:18, "Maximilian Michels" <mx...@apache.org> wrote:
>>>>>
>>>>>> Hi Daniel, hi Juan,
>>>>>>
>>>>>> @Daniel Thanks a lot for investigating and reporting the issue.
>>>>>>
>>>>>> Your analysis looks convincing, it may be that Jackson is holding on
>>>>>> to
>>>>>> the
>>>>>> Classloader. Beam uses Jackson to parse the FlinkPipelineOptions.
>>>>>>
>>>>>> Have you already tried to call
>>>>>> TypeFactory.defaultInstance().clearCache()
>>>>>> in a
>>>>>> catch-all block within your synthetic Beam job, before actually
>>>>>> failing?
>>>>>> That
>>>>>> way we could see if the classloader is garbage-collected after a
>>>>>> restart.
>>>>>>
>>>>>> Let me also investigate in the meantime. We are in the progress of
>>>>>> getting the
>>>>>> 2.10.0 release ready with a few pending issues. So it would be a good
>>>>>> time to
>>>>>> fix this issue.
>>>>>>
>>>>>> Thanks,
>>>>>> Max
>>>>>>
>>>>>> On 17.01.19 09:50, Juan Carlos Garcia wrote:
>>>>>>> Nice finding, we are also experiencing the same (Flink 1.5.4)  where
>>>>>>> few jobs
>>>>>>> are dying of OOM for the metaspace as well after multiple restart,
>>>>>>> in
>>>>>>> our case
>>>>>>> we have
>>>>>>> a HA flink cluster and not using YARN for orchestration.
>>>>>>>
>>>>>>> Good job with the diagnosing .
>>>>>>>
>>>>>>> JC
>>>>>>>
>>>>>>> On Thu, Jan 17, 2019 at 3:23 PM Daniel Harper
>>>>>>> <Daniel.Harper@bbc.co.uk
>>>>>>> <ma...@bbc.co.uk>> wrote:
>>>>>>>
>>>>>>>        Environment:
>>>>>>>
>>>>>>>        BEAM 2.7.0
>>>>>>>        Flink 1.5.2
>>>>>>>        AWS EMR 5.17.0
>>>>>>>        Hadoop YARN for orchestration
>>>>>>>
>>>>>>>
>>>>>>>        We¹ve noticed the metaspace usage increasing when our Flink
>>>>>>> job
>>>>>>> restarts,
>>>>>>>        which in turn sometimes causes YARN to kill the container for
>>>>>>> going
>>>>>>> beyond
>>>>>>>        its physical memory limits. After setting the MaxMetaspaceSize
>>>>>>> setting and
>>>>>>>        making the JVM dump its heap on OOM, we noticed quite a few
>>>>>>> instances of the
>>>>>>>        FlinkUserClassLoader class hanging around, which corresponded
>>>>>>> with
>>>>>>> the
>>>>>>>        number of restarts that happened.
>>>>>>>
>>>>>>>        Originally I posted this issue on the FLINK mailing list here
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/U
>>>>>>> se
>>>>>>> r-
>>>>>>> ClassLoader-leak-on-job-restart-td25547.html
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>        After investigation I think this is related to something in
>>>>>>> the
>>>>>>> BEAM code,
>>>>>>>        or the way BEAM interacts with the Flink class loading
>>>>>>> mechanism,
>>>>>>> because I
>>>>>>>        can see the following when selecting one of the Œold¹
>>>>>>> classloaders
>>>>>>> -> Path
>>>>>>>        to GC Roots using Eclipse MAT in one of the heap dumps
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>        This looks to me like this issue
>>>>>>>        https://github.com/FasterXML/jackson-databind/issues/1363
>>>>>>>
>>>>>>>
>>>>>>>        It sounds like to resolve it, user code should call
>>>>>>>        TypeFactory.defaultInstance().clearCache()when threads are
>>>>>>> shutdown. I¹m not
>>>>>>>        sure where in the FlinkRunner codebase this should be though
>>>>>>>
>>>>>>>
>>>>>>>        To try and narrow it down as much as possible/reduce the
>>>>>>> number
>>>>>>> of
>>>>>>>        dependencies I¹ve managed to reproduce this with a really
>>>>>>> really
>>>>>>> simple job
>>>>>>>        that just reads from a synthetic unbounded source (back-ported
>>>>>>> from
>>>>>>> the
>>>>>>>        master branch) and does nothing
>>>>>>> https://github.com/djhworld/streaming-job,
>>>>>>>        this will run on a Flink environment.
>>>>>>>
>>>>>>>        To reproduce the OOM I just ran the job with
>>>>>>> MaxMetaspaceSize=125M,
>>>>>>> and then
>>>>>>>        killed a random task manager every 60 seconds, which yielded
>>>>>>> the
>>>>>>> following
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>        As you can see the number of classes increases on each
>>>>>>> restart,
>>>>>>> which causes
>>>>>>>        the metaspace to increase and eventually cause an OOM.
>>>>>>>
>>>>>>>        Is there anything we could do to fix this? I¹ve not tested
>>>>>>> this
>>>>>>> on
>>>>>>>> 2.7.0
>>>>>>>        because we are waiting for 2.10 to drop so we can run Flink
>>>>>>> 1.6/1.7
>>>>>>> on EMR
>>>>>>>
>>>>>>>        With thanks,
>>>>>>>
>>>>>>>        Daniel
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>        ----------------------------
>>>>>>>
>>>>>>>        http://www.bbc.co.uk <http://www.bbc.co.uk>
>>>>>>>        This e-mail (and any attachments) is confidential and may
>>>>>>> contain
>>>>>>> personal
>>>>>>>        views which are not the views of the BBC unless specifically
>>>>>>> stated.
>>>>>>>        If you have received it in error, please delete it from your
>>>>>>> system.
>>>>>>>        Do not use, copy or disclose the information in any way nor
>>>>>>> act
>>>>>>> in
>>>>>>> reliance
>>>>>>>        on it and notify the sender immediately.
>>>>>>>        Please note that the BBC monitors e-mails sent or received.
>>>>>>>        Further communication will signify your consent to this.
>>>>>>>
>>>>>>>        ---------------------
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> JC
>>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> -----------------------------
>>>>> http://www.bbc.co.uk
>>>>> This e-mail (and any attachments) is confidential and
>>>>> may contain personal views which are not the views of the BBC unless
>>>>> specifically stated.
>>>>> If you have received it in
>>>>> error, please delete it from your system.
>>>>> Do not use, copy or disclose the
>>>>> information in any way nor act in reliance on it and notify the sender
>>>>> immediately.
>>>>> Please note that the BBC monitors e-mails
>>>>> sent or received.
>>>>> Further communication will signify your consent to
>>>>> this.
>>>>> -----------------------------
>>>>>
>>>
>>>
>>>
>>> -----------------------------
>>> http://www.bbc.co.uk
>>> This e-mail (and any attachments) is confidential and
>>> may contain personal views which are not the views of the BBC unless
>>> specifically stated.
>>> If you have received it in
>>> error, please delete it from your system.
>>> Do not use, copy or disclose the
>>> information in any way nor act in reliance on it and notify the sender
>>> immediately.
>>> Please note that the BBC monitors e-mails
>>> sent or received.
>>> Further communication will signify your consent to
>>> this.
>>> -----------------------------
>>>
> 
> 
> 
> -----------------------------
> http://www.bbc.co.uk
> This e-mail (and any attachments) is confidential and
> may contain personal views which are not the views of the BBC unless specifically stated.
> If you have received it in
> error, please delete it from your system.
> Do not use, copy or disclose the
> information in any way nor act in reliance on it and notify the sender
> immediately.
> Please note that the BBC monitors e-mails
> sent or received.
> Further communication will signify your consent to
> this.
> -----------------------------
> 

Re: Classloader memory leak on job restart (FlinkRunner)

Posted by Daniel Harper <Da...@bbc.co.uk>.
"classloader.resolve-order” is set to ‘parent-first’


In the simple job that I wrote, the mvn dependency:tree yields the
following. It looks like beam-sdks-java-core includes jackson-databind,
and the flink runtime lib shades it


[INFO] Scanning for projects...
[WARNING] 
[WARNING] Some problems were encountered while building the effective
model for example:streaming-job:jar:1.0-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for
org.apache.maven.plugins:maven-jar-plugin is missing. @ line 60, column 21
[WARNING] 
[WARNING] It is highly recommended to fix these problems because they
threaten the stability of your build.
[WARNING] 
[WARNING] For this reason, future Maven versions might no longer support
building such malformed projects.
[WARNING] 
[INFO] 
[INFO] -----------------------< example:streaming-job
>------------------------
[INFO] Building streaming-job 1.0-SNAPSHOT
[INFO] --------------------------------[ jar
]---------------------------------
[INFO] 
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ streaming-job
---
[INFO] example:streaming-job:jar:1.0-SNAPSHOT
[INFO] +- org.apache.beam:beam-sdks-java-core:jar:2.7.0:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-core:jar:2.9.5:compile
[INFO] |  +- 
com.fasterxml.jackson.core:jackson-annotations:jar:2.9.5:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.9.5:compile
[INFO] |  +- org.slf4j:slf4j-api:jar:1.7.25:compile
[INFO] |  +- org.apache.avro:avro:jar:1.8.2:compile
[INFO] |  |  +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile
[INFO] |  |  +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile
[INFO] |  |  \- com.thoughtworks.paranamer:paranamer:jar:2.7:compile
[INFO] |  +- org.xerial.snappy:snappy-java:jar:1.1.4:compile
[INFO] |  +- joda-time:joda-time:jar:2.4:compile
[INFO] |  \- org.tukaani:xz:jar:1.8:compile
[INFO] \- org.apache.beam:beam-runners-flink_2.11:jar:2.7.0:compile
[INFO]    +- org.apache.beam:beam-runners-core-java:jar:2.7.0:compile
[INFO]    |  +- org.apache.beam:beam-model-pipeline:jar:2.7.0:compile
[INFO]    |  |  \- 
com.google.errorprone:error_prone_annotations:jar:2.1.2:compile
[INFO]    |  \- org.apache.beam:beam-model-fn-execution:jar:2.7.0:compile
[INFO]    +- 
org.apache.beam:beam-runners-core-construction-java:jar:2.7.0:compile
[INFO]    |  \- org.apache.beam:beam-model-job-management:jar:2.7.0:compile
[INFO]    +- 
org.apache.beam:beam-runners-java-fn-execution:jar:2.7.0:compile
[INFO]    |  +- 
org.apache.beam:beam-sdks-java-fn-execution:jar:2.7.0:compile
[INFO]    |  \- 
org.apache.beam:beam-vendor-sdks-java-extensions-protobuf:jar:2.7.0:compile
[INFO]    +- org.slf4j:slf4j-simple:jar:1.7.25:compile
[INFO]    +- org.apache.commons:commons-compress:jar:1.16.1:compile
[INFO]    |  \- org.objenesis:objenesis:jar:2.6:compile
[INFO]    +- args4j:args4j:jar:2.33:compile
[INFO]    +- org.apache.flink:flink-clients_2.11:jar:1.5.2:compile
[INFO]    |  +- org.apache.flink:flink-optimizer_2.11:jar:1.5.2:compile
[INFO]    |  +- commons-cli:commons-cli:jar:1.3.1:compile
[INFO]    |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO]    |  \- org.apache.flink:force-shading:jar:1.5.2:compile
[INFO]    +- org.apache.flink:flink-core:jar:1.5.2:compile
[INFO]    |  +- org.apache.flink:flink-annotations:jar:1.5.2:compile
[INFO]    |  +- org.apache.flink:flink-shaded-asm:jar:5.0.4-2.0:compile
[INFO]    |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
[INFO]    |  +- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
[INFO]    |  |  \- com.esotericsoftware.minlog:minlog:jar:1.2:compile
[INFO]    |  \- commons-collections:commons-collections:jar:3.2.2:compile
[INFO]    +- org.apache.flink:flink-metrics-core:jar:1.5.2:compile
[INFO]    +- org.apache.flink:flink-java:jar:1.5.2:compile
[INFO]    |  \- org.apache.commons:commons-math3:jar:3.5:compile
[INFO]    +- org.apache.flink:flink-runtime_2.11:jar:1.5.2:compile
[INFO]    |  +- 
org.apache.flink:flink-queryable-state-client-java_2.11:jar:1.5.2:compile
[INFO]    |  +- org.apache.flink:flink-hadoop-fs:jar:1.5.2:compile
[INFO]    |  +- commons-io:commons-io:jar:2.4:compile
[INFO]    |  +- 
org.apache.flink:flink-shaded-netty:jar:4.0.27.Final-2.0:compile
[INFO]    |  +- org.apache.flink:flink-shaded-guava:jar:18.0-2.0:compile
[INFO]    |  +- org.apache.flink:flink-shaded-jackson:jar:2.7.9-3.0:compile
[INFO]    |  +- org.javassist:javassist:jar:3.18.2-GA:compile
[INFO]    |  +- org.scala-lang:scala-library:jar:2.11.12:compile
[INFO]    |  +- com.typesafe.akka:akka-actor_2.11:jar:2.4.20:compile
[INFO]    |  |  +- com.typesafe:config:jar:1.3.0:compile
[INFO]    |  |  \- 
org.scala-lang.modules:scala-java8-compat_2.11:jar:0.7.0:compile
[INFO]    |  +- com.typesafe.akka:akka-stream_2.11:jar:2.4.20:compile
[INFO]    |  |  +- org.reactivestreams:reactive-streams:jar:1.0.0:compile
[INFO]    |  |  \- com.typesafe:ssl-config-core_2.11:jar:0.2.1:compile
[INFO]    |  |     \-
org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4:compile
[INFO]    |  +- com.typesafe.akka:akka-protobuf_2.11:jar:2.4.20:compile
[INFO]    |  +- com.typesafe.akka:akka-slf4j_2.11:jar:2.4.20:compile
[INFO]    |  +- org.clapper:grizzled-slf4j_2.11:jar:1.0.2:compile
[INFO]    |  +- com.github.scopt:scopt_2.11:jar:3.5.0:compile
[INFO]    |  \- com.twitter:chill_2.11:jar:0.7.4:compile
[INFO]    |     \- com.twitter:chill-java:jar:0.7.4:compile
[INFO]    \- org.apache.flink:flink-streaming-java_2.11:jar:1.5.2:compile
[INFO] 
------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] 
------------------------------------------------------------------------
[INFO] Total time:  0.928 s
[INFO] Finished at: 2019-01-21T08:45:08Z
[INFO] 
------------------------------------------------------------------------





On 18/01/2019, 17:01, "Maximilian Michels" <mx...@apache.org> wrote:

>Hi Daniel,
>
>I did some more debugging. I think the fix we proposed only cures the
>symptoms. 
>The cause is that your job uses Jackson which is also a dependency of
>Flink.
>
>So your job ends up using Flink's version of Jackson which then installs
>classes 
>from your job in the Jackson cache. Now, this wouldn't be a problem, if
>you 
>shaded your version of Jackson, i.e. renamed the Jackson package.
>
>But even without shading, the default behavior of Flink is to load user
>classes 
>first. Could you please check:
>
>1) Is "classloader.resolve-order" set to "child-first"?
>2) Do you include the Jackson library in your user jar?
>
>Thanks,
>Max
>
>On 18.01.19 03:40, Daniel Harper wrote:
>> Thanks for raising this and the PR!
>> 
>> In our production streaming job we’re using Kinesis, so good shout on
>>the
>> UnboundedSupportWrapper.
>> 
>> On 17/01/2019, 21:08, "Maximilian Michels" <mx...@apache.org> wrote:
>> 
>>> I'm glad that solved your GC problem. I think dipose() is a good place,
>>> it is
>>> meant for cleanup.
>>>
>>> In your case the DoFn is a NOOP, so the PipelineOptions are probably
>>> loaded
>>> through your UnboundedSource. If both happen to be scheduled in the
>>>same
>>> TaskManager that is fine. However, just for precaution we should also
>>> include
>>> the cache invalidation in UnboundedSourceWrapper.
>>>
>>> This way we should be good for the streaming execution. Will try to get
>>> this
>>> into 2.10.0.
>>>
>>> Thanks,
>>> Max
>>>
>>> Issue: https://jira.apache.org/jira/browse/BEAM-6460
>>>
>>> On 17.01.19 12:50, Daniel Harper wrote:
>>>> Max, Juan,
>>>>
>>>> Just tried patching this class
>>>>
>>>> 
>>>>https://github.com/apache/beam/blob/v2.7.0/runners/flink/src/main/java/
>>>>or
>>>> g/
>>>>
>>>> 
>>>>apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.j
>>>>av
>>>> a#
>>>> L389 and putting the clearCache call in the finally block.
>>>>
>>>> Redoing the test causes the GC to kick in (see screenshot)
>>>>
>>>> I¹m not sure if this is the best place to put this clean up code
>>>>though,
>>>> is this the final place where all BEAM related stuff get terminated?
>>>>
>>>> Daniel.
>>>>
>>>>
>>>>
>>>> On 17/01/2019, 16:18, "Maximilian Michels" <mx...@apache.org> wrote:
>>>>
>>>>> Hi Daniel, hi Juan,
>>>>>
>>>>> @Daniel Thanks a lot for investigating and reporting the issue.
>>>>>
>>>>> Your analysis looks convincing, it may be that Jackson is holding on
>>>>>to
>>>>> the
>>>>> Classloader. Beam uses Jackson to parse the FlinkPipelineOptions.
>>>>>
>>>>> Have you already tried to call
>>>>> TypeFactory.defaultInstance().clearCache()
>>>>> in a
>>>>> catch-all block within your synthetic Beam job, before actually
>>>>> failing?
>>>>> That
>>>>> way we could see if the classloader is garbage-collected after a
>>>>> restart.
>>>>>
>>>>> Let me also investigate in the meantime. We are in the progress of
>>>>> getting the
>>>>> 2.10.0 release ready with a few pending issues. So it would be a good
>>>>> time to
>>>>> fix this issue.
>>>>>
>>>>> Thanks,
>>>>> Max
>>>>>
>>>>> On 17.01.19 09:50, Juan Carlos Garcia wrote:
>>>>>> Nice finding, we are also experiencing the same (Flink 1.5.4)  where
>>>>>> few jobs
>>>>>> are dying of OOM for the metaspace as well after multiple restart,
>>>>>>in
>>>>>> our case
>>>>>> we have
>>>>>> a HA flink cluster and not using YARN for orchestration.
>>>>>>
>>>>>> Good job with the diagnosing .
>>>>>>
>>>>>> JC
>>>>>>
>>>>>> On Thu, Jan 17, 2019 at 3:23 PM Daniel Harper
>>>>>><Daniel.Harper@bbc.co.uk
>>>>>> <ma...@bbc.co.uk>> wrote:
>>>>>>
>>>>>>       Environment:
>>>>>>
>>>>>>       BEAM 2.7.0
>>>>>>       Flink 1.5.2
>>>>>>       AWS EMR 5.17.0
>>>>>>       Hadoop YARN for orchestration
>>>>>>
>>>>>>
>>>>>>       We¹ve noticed the metaspace usage increasing when our Flink
>>>>>>job
>>>>>> restarts,
>>>>>>       which in turn sometimes causes YARN to kill the container for
>>>>>> going
>>>>>> beyond
>>>>>>       its physical memory limits. After setting the MaxMetaspaceSize
>>>>>> setting and
>>>>>>       making the JVM dump its heap on OOM, we noticed quite a few
>>>>>> instances of the
>>>>>>       FlinkUserClassLoader class hanging around, which corresponded
>>>>>> with
>>>>>> the
>>>>>>       number of restarts that happened.
>>>>>>
>>>>>>       Originally I posted this issue on the FLINK mailing list here
>>>>>>
>>>>>>
>>>>>> 
>>>>>>http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/U
>>>>>>se
>>>>>> r-
>>>>>> ClassLoader-leak-on-job-restart-td25547.html
>>>>>>
>>>>>>
>>>>>>
>>>>>>       After investigation I think this is related to something in
>>>>>>the
>>>>>> BEAM code,
>>>>>>       or the way BEAM interacts with the Flink class loading
>>>>>>mechanism,
>>>>>> because I
>>>>>>       can see the following when selecting one of the Œold¹
>>>>>> classloaders
>>>>>> -> Path
>>>>>>       to GC Roots using Eclipse MAT in one of the heap dumps
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>       This looks to me like this issue
>>>>>>       https://github.com/FasterXML/jackson-databind/issues/1363
>>>>>>
>>>>>>
>>>>>>       It sounds like to resolve it, user code should call
>>>>>>       TypeFactory.defaultInstance().clearCache()when threads are
>>>>>> shutdown. I¹m not
>>>>>>       sure where in the FlinkRunner codebase this should be though
>>>>>>
>>>>>>
>>>>>>       To try and narrow it down as much as possible/reduce the
>>>>>>number
>>>>>> of
>>>>>>       dependencies I¹ve managed to reproduce this with a really
>>>>>>really
>>>>>> simple job
>>>>>>       that just reads from a synthetic unbounded source (back-ported
>>>>>> from
>>>>>> the
>>>>>>       master branch) and does nothing
>>>>>> https://github.com/djhworld/streaming-job,
>>>>>>       this will run on a Flink environment.
>>>>>>
>>>>>>       To reproduce the OOM I just ran the job with
>>>>>> MaxMetaspaceSize=125M,
>>>>>> and then
>>>>>>       killed a random task manager every 60 seconds, which yielded
>>>>>>the
>>>>>> following
>>>>>>
>>>>>>
>>>>>>
>>>>>>       As you can see the number of classes increases on each
>>>>>>restart,
>>>>>> which causes
>>>>>>       the metaspace to increase and eventually cause an OOM.
>>>>>>
>>>>>>       Is there anything we could do to fix this? I¹ve not tested
>>>>>>this
>>>>>> on
>>>>>>> 2.7.0
>>>>>>       because we are waiting for 2.10 to drop so we can run Flink
>>>>>> 1.6/1.7
>>>>>> on EMR
>>>>>>
>>>>>>       With thanks,
>>>>>>
>>>>>>       Daniel
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>       ----------------------------
>>>>>>
>>>>>>       http://www.bbc.co.uk <http://www.bbc.co.uk>
>>>>>>       This e-mail (and any attachments) is confidential and may
>>>>>>contain
>>>>>> personal
>>>>>>       views which are not the views of the BBC unless specifically
>>>>>> stated.
>>>>>>       If you have received it in error, please delete it from your
>>>>>> system.
>>>>>>       Do not use, copy or disclose the information in any way nor
>>>>>>act
>>>>>> in
>>>>>> reliance
>>>>>>       on it and notify the sender immediately.
>>>>>>       Please note that the BBC monitors e-mails sent or received.
>>>>>>       Further communication will signify your consent to this.
>>>>>>
>>>>>>       ---------------------
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> JC
>>>>>>
>>>>
>>>>
>>>>
>>>> -----------------------------
>>>> http://www.bbc.co.uk
>>>> This e-mail (and any attachments) is confidential and
>>>> may contain personal views which are not the views of the BBC unless
>>>> specifically stated.
>>>> If you have received it in
>>>> error, please delete it from your system.
>>>> Do not use, copy or disclose the
>>>> information in any way nor act in reliance on it and notify the sender
>>>> immediately.
>>>> Please note that the BBC monitors e-mails
>>>> sent or received.
>>>> Further communication will signify your consent to
>>>> this.
>>>> -----------------------------
>>>>
>> 
>> 
>> 
>> -----------------------------
>> http://www.bbc.co.uk
>> This e-mail (and any attachments) is confidential and
>> may contain personal views which are not the views of the BBC unless
>>specifically stated.
>> If you have received it in
>> error, please delete it from your system.
>> Do not use, copy or disclose the
>> information in any way nor act in reliance on it and notify the sender
>> immediately.
>> Please note that the BBC monitors e-mails
>> sent or received.
>> Further communication will signify your consent to
>> this.
>> -----------------------------
>> 



-----------------------------
http://www.bbc.co.uk
This e-mail (and any attachments) is confidential and 
may contain personal views which are not the views of the BBC unless specifically stated.
If you have received it in 
error, please delete it from your system.
Do not use, copy or disclose the 
information in any way nor act in reliance on it and notify the sender 
immediately.
Please note that the BBC monitors e-mails 
sent or received.
Further communication will signify your consent to 
this.
-----------------------------

Re: Classloader memory leak on job restart (FlinkRunner)

Posted by Maximilian Michels <mx...@apache.org>.
Hi Daniel,

I did some more debugging. I think the fix we proposed only cures the symptoms. 
The cause is that your job uses Jackson which is also a dependency of Flink.

So your job ends up using Flink's version of Jackson which then installs classes 
from your job in the Jackson cache. Now, this wouldn't be a problem, if you 
shaded your version of Jackson, i.e. renamed the Jackson package.

But even without shading, the default behavior of Flink is to load user classes 
first. Could you please check:

1) Is "classloader.resolve-order" set to "child-first"?
2) Do you include the Jackson library in your user jar?

Thanks,
Max

On 18.01.19 03:40, Daniel Harper wrote:
> Thanks for raising this and the PR!
> 
> In our production streaming job we’re using Kinesis, so good shout on the
> UnboundedSupportWrapper.
> 
> On 17/01/2019, 21:08, "Maximilian Michels" <mx...@apache.org> wrote:
> 
>> I'm glad that solved your GC problem. I think dipose() is a good place,
>> it is
>> meant for cleanup.
>>
>> In your case the DoFn is a NOOP, so the PipelineOptions are probably
>> loaded
>> through your UnboundedSource. If both happen to be scheduled in the same
>> TaskManager that is fine. However, just for precaution we should also
>> include
>> the cache invalidation in UnboundedSourceWrapper.
>>
>> This way we should be good for the streaming execution. Will try to get
>> this
>> into 2.10.0.
>>
>> Thanks,
>> Max
>>
>> Issue: https://jira.apache.org/jira/browse/BEAM-6460
>>
>> On 17.01.19 12:50, Daniel Harper wrote:
>>> Max, Juan,
>>>
>>> Just tried patching this class
>>>
>>> https://github.com/apache/beam/blob/v2.7.0/runners/flink/src/main/java/or
>>> g/
>>>
>>> apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.jav
>>> a#
>>> L389 and putting the clearCache call in the finally block.
>>>
>>> Redoing the test causes the GC to kick in (see screenshot)
>>>
>>> I¹m not sure if this is the best place to put this clean up code though,
>>> is this the final place where all BEAM related stuff get terminated?
>>>
>>> Daniel.
>>>
>>>
>>>
>>> On 17/01/2019, 16:18, "Maximilian Michels" <mx...@apache.org> wrote:
>>>
>>>> Hi Daniel, hi Juan,
>>>>
>>>> @Daniel Thanks a lot for investigating and reporting the issue.
>>>>
>>>> Your analysis looks convincing, it may be that Jackson is holding on to
>>>> the
>>>> Classloader. Beam uses Jackson to parse the FlinkPipelineOptions.
>>>>
>>>> Have you already tried to call
>>>> TypeFactory.defaultInstance().clearCache()
>>>> in a
>>>> catch-all block within your synthetic Beam job, before actually
>>>> failing?
>>>> That
>>>> way we could see if the classloader is garbage-collected after a
>>>> restart.
>>>>
>>>> Let me also investigate in the meantime. We are in the progress of
>>>> getting the
>>>> 2.10.0 release ready with a few pending issues. So it would be a good
>>>> time to
>>>> fix this issue.
>>>>
>>>> Thanks,
>>>> Max
>>>>
>>>> On 17.01.19 09:50, Juan Carlos Garcia wrote:
>>>>> Nice finding, we are also experiencing the same (Flink 1.5.4)  where
>>>>> few jobs
>>>>> are dying of OOM for the metaspace as well after multiple restart, in
>>>>> our case
>>>>> we have
>>>>> a HA flink cluster and not using YARN for orchestration.
>>>>>
>>>>> Good job with the diagnosing .
>>>>>
>>>>> JC
>>>>>
>>>>> On Thu, Jan 17, 2019 at 3:23 PM Daniel Harper <Daniel.Harper@bbc.co.uk
>>>>> <ma...@bbc.co.uk>> wrote:
>>>>>
>>>>>       Environment:
>>>>>
>>>>>       BEAM 2.7.0
>>>>>       Flink 1.5.2
>>>>>       AWS EMR 5.17.0
>>>>>       Hadoop YARN for orchestration
>>>>>
>>>>>
>>>>>       We¹ve noticed the metaspace usage increasing when our Flink job
>>>>> restarts,
>>>>>       which in turn sometimes causes YARN to kill the container for
>>>>> going
>>>>> beyond
>>>>>       its physical memory limits. After setting the MaxMetaspaceSize
>>>>> setting and
>>>>>       making the JVM dump its heap on OOM, we noticed quite a few
>>>>> instances of the
>>>>>       FlinkUserClassLoader class hanging around, which corresponded
>>>>> with
>>>>> the
>>>>>       number of restarts that happened.
>>>>>
>>>>>       Originally I posted this issue on the FLINK mailing list here
>>>>>
>>>>>
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use
>>>>> r-
>>>>> ClassLoader-leak-on-job-restart-td25547.html
>>>>>
>>>>>
>>>>>
>>>>>       After investigation I think this is related to something in the
>>>>> BEAM code,
>>>>>       or the way BEAM interacts with the Flink class loading mechanism,
>>>>> because I
>>>>>       can see the following when selecting one of the Œold¹
>>>>> classloaders
>>>>> -> Path
>>>>>       to GC Roots using Eclipse MAT in one of the heap dumps
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>       This looks to me like this issue
>>>>>       https://github.com/FasterXML/jackson-databind/issues/1363
>>>>>
>>>>>
>>>>>       It sounds like to resolve it, user code should call
>>>>>       TypeFactory.defaultInstance().clearCache()when threads are
>>>>> shutdown. I¹m not
>>>>>       sure where in the FlinkRunner codebase this should be though
>>>>>
>>>>>
>>>>>       To try and narrow it down as much as possible/reduce the number
>>>>> of
>>>>>       dependencies I¹ve managed to reproduce this with a really really
>>>>> simple job
>>>>>       that just reads from a synthetic unbounded source (back-ported
>>>>> from
>>>>> the
>>>>>       master branch) and does nothing
>>>>> https://github.com/djhworld/streaming-job,
>>>>>       this will run on a Flink environment.
>>>>>
>>>>>       To reproduce the OOM I just ran the job with
>>>>> MaxMetaspaceSize=125M,
>>>>> and then
>>>>>       killed a random task manager every 60 seconds, which yielded the
>>>>> following
>>>>>
>>>>>
>>>>>
>>>>>       As you can see the number of classes increases on each restart,
>>>>> which causes
>>>>>       the metaspace to increase and eventually cause an OOM.
>>>>>
>>>>>       Is there anything we could do to fix this? I¹ve not tested this
>>>>> on
>>>>>> 2.7.0
>>>>>       because we are waiting for 2.10 to drop so we can run Flink
>>>>> 1.6/1.7
>>>>> on EMR
>>>>>
>>>>>       With thanks,
>>>>>
>>>>>       Daniel
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>       ----------------------------
>>>>>
>>>>>       http://www.bbc.co.uk <http://www.bbc.co.uk>
>>>>>       This e-mail (and any attachments) is confidential and may contain
>>>>> personal
>>>>>       views which are not the views of the BBC unless specifically
>>>>> stated.
>>>>>       If you have received it in error, please delete it from your
>>>>> system.
>>>>>       Do not use, copy or disclose the information in any way nor act
>>>>> in
>>>>> reliance
>>>>>       on it and notify the sender immediately.
>>>>>       Please note that the BBC monitors e-mails sent or received.
>>>>>       Further communication will signify your consent to this.
>>>>>
>>>>>       ---------------------
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> JC
>>>>>
>>>
>>>
>>>
>>> -----------------------------
>>> http://www.bbc.co.uk
>>> This e-mail (and any attachments) is confidential and
>>> may contain personal views which are not the views of the BBC unless
>>> specifically stated.
>>> If you have received it in
>>> error, please delete it from your system.
>>> Do not use, copy or disclose the
>>> information in any way nor act in reliance on it and notify the sender
>>> immediately.
>>> Please note that the BBC monitors e-mails
>>> sent or received.
>>> Further communication will signify your consent to
>>> this.
>>> -----------------------------
>>>
> 
> 
> 
> -----------------------------
> http://www.bbc.co.uk
> This e-mail (and any attachments) is confidential and
> may contain personal views which are not the views of the BBC unless specifically stated.
> If you have received it in
> error, please delete it from your system.
> Do not use, copy or disclose the
> information in any way nor act in reliance on it and notify the sender
> immediately.
> Please note that the BBC monitors e-mails
> sent or received.
> Further communication will signify your consent to
> this.
> -----------------------------
> 

Re: Classloader memory leak on job restart (FlinkRunner)

Posted by Daniel Harper <Da...@bbc.co.uk>.
Thanks for raising this and the PR!

In our production streaming job we’re using Kinesis, so good shout on the
UnboundedSupportWrapper.

On 17/01/2019, 21:08, "Maximilian Michels" <mx...@apache.org> wrote:

>I'm glad that solved your GC problem. I think dipose() is a good place,
>it is
>meant for cleanup.
>
>In your case the DoFn is a NOOP, so the PipelineOptions are probably
>loaded
>through your UnboundedSource. If both happen to be scheduled in the same
>TaskManager that is fine. However, just for precaution we should also
>include
>the cache invalidation in UnboundedSourceWrapper.
>
>This way we should be good for the streaming execution. Will try to get
>this
>into 2.10.0.
>
>Thanks,
>Max
>
>Issue: https://jira.apache.org/jira/browse/BEAM-6460
>
>On 17.01.19 12:50, Daniel Harper wrote:
>> Max, Juan,
>>
>> Just tried patching this class
>>
>>https://github.com/apache/beam/blob/v2.7.0/runners/flink/src/main/java/or
>>g/
>>
>>apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.jav
>>a#
>> L389 and putting the clearCache call in the finally block.
>>
>> Redoing the test causes the GC to kick in (see screenshot)
>>
>> I¹m not sure if this is the best place to put this clean up code though,
>> is this the final place where all BEAM related stuff get terminated?
>>
>> Daniel.
>>
>>
>>
>> On 17/01/2019, 16:18, "Maximilian Michels" <mx...@apache.org> wrote:
>>
>>> Hi Daniel, hi Juan,
>>>
>>> @Daniel Thanks a lot for investigating and reporting the issue.
>>>
>>> Your analysis looks convincing, it may be that Jackson is holding on to
>>> the
>>> Classloader. Beam uses Jackson to parse the FlinkPipelineOptions.
>>>
>>> Have you already tried to call
>>>TypeFactory.defaultInstance().clearCache()
>>> in a
>>> catch-all block within your synthetic Beam job, before actually
>>>failing?
>>> That
>>> way we could see if the classloader is garbage-collected after a
>>>restart.
>>>
>>> Let me also investigate in the meantime. We are in the progress of
>>> getting the
>>> 2.10.0 release ready with a few pending issues. So it would be a good
>>> time to
>>> fix this issue.
>>>
>>> Thanks,
>>> Max
>>>
>>> On 17.01.19 09:50, Juan Carlos Garcia wrote:
>>>> Nice finding, we are also experiencing the same (Flink 1.5.4)  where
>>>> few jobs
>>>> are dying of OOM for the metaspace as well after multiple restart, in
>>>> our case
>>>> we have
>>>> a HA flink cluster and not using YARN for orchestration.
>>>>
>>>> Good job with the diagnosing .
>>>>
>>>> JC
>>>>
>>>> On Thu, Jan 17, 2019 at 3:23 PM Daniel Harper <Daniel.Harper@bbc.co.uk
>>>> <ma...@bbc.co.uk>> wrote:
>>>>
>>>>      Environment:
>>>>
>>>>      BEAM 2.7.0
>>>>      Flink 1.5.2
>>>>      AWS EMR 5.17.0
>>>>      Hadoop YARN for orchestration
>>>>
>>>>
>>>>      We¹ve noticed the metaspace usage increasing when our Flink job
>>>> restarts,
>>>>      which in turn sometimes causes YARN to kill the container for
>>>>going
>>>> beyond
>>>>      its physical memory limits. After setting the MaxMetaspaceSize
>>>> setting and
>>>>      making the JVM dump its heap on OOM, we noticed quite a few
>>>> instances of the
>>>>      FlinkUserClassLoader class hanging around, which corresponded
>>>>with
>>>> the
>>>>      number of restarts that happened.
>>>>
>>>>      Originally I posted this issue on the FLINK mailing list here
>>>>
>>>>
>>>>http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use
>>>>r-
>>>> ClassLoader-leak-on-job-restart-td25547.html
>>>>
>>>>
>>>>
>>>>      After investigation I think this is related to something in the
>>>> BEAM code,
>>>>      or the way BEAM interacts with the Flink class loading mechanism,
>>>> because I
>>>>      can see the following when selecting one of the Œold¹
>>>>classloaders
>>>> -> Path
>>>>      to GC Roots using Eclipse MAT in one of the heap dumps
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>      This looks to me like this issue
>>>>      https://github.com/FasterXML/jackson-databind/issues/1363
>>>>
>>>>
>>>>      It sounds like to resolve it, user code should call
>>>>      TypeFactory.defaultInstance().clearCache()when threads are
>>>> shutdown. I¹m not
>>>>      sure where in the FlinkRunner codebase this should be though
>>>>
>>>>
>>>>      To try and narrow it down as much as possible/reduce the number
>>>>of
>>>>      dependencies I¹ve managed to reproduce this with a really really
>>>> simple job
>>>>      that just reads from a synthetic unbounded source (back-ported
>>>>from
>>>> the
>>>>      master branch) and does nothing
>>>> https://github.com/djhworld/streaming-job,
>>>>      this will run on a Flink environment.
>>>>
>>>>      To reproduce the OOM I just ran the job with
>>>>MaxMetaspaceSize=125M,
>>>> and then
>>>>      killed a random task manager every 60 seconds, which yielded the
>>>> following
>>>>
>>>>
>>>>
>>>>      As you can see the number of classes increases on each restart,
>>>> which causes
>>>>      the metaspace to increase and eventually cause an OOM.
>>>>
>>>>      Is there anything we could do to fix this? I¹ve not tested this
>>>>on
>>>>> 2.7.0
>>>>      because we are waiting for 2.10 to drop so we can run Flink
>>>>1.6/1.7
>>>> on EMR
>>>>
>>>>      With thanks,
>>>>
>>>>      Daniel
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>      ----------------------------
>>>>
>>>>      http://www.bbc.co.uk <http://www.bbc.co.uk>
>>>>      This e-mail (and any attachments) is confidential and may contain
>>>> personal
>>>>      views which are not the views of the BBC unless specifically
>>>>stated.
>>>>      If you have received it in error, please delete it from your
>>>>system.
>>>>      Do not use, copy or disclose the information in any way nor act
>>>>in
>>>> reliance
>>>>      on it and notify the sender immediately.
>>>>      Please note that the BBC monitors e-mails sent or received.
>>>>      Further communication will signify your consent to this.
>>>>
>>>>      ---------------------
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> JC
>>>>
>>
>>
>>
>> -----------------------------
>> http://www.bbc.co.uk
>> This e-mail (and any attachments) is confidential and
>> may contain personal views which are not the views of the BBC unless
>>specifically stated.
>> If you have received it in
>> error, please delete it from your system.
>> Do not use, copy or disclose the
>> information in any way nor act in reliance on it and notify the sender
>> immediately.
>> Please note that the BBC monitors e-mails
>> sent or received.
>> Further communication will signify your consent to
>> this.
>> -----------------------------
>>



-----------------------------
http://www.bbc.co.uk
This e-mail (and any attachments) is confidential and
may contain personal views which are not the views of the BBC unless specifically stated.
If you have received it in
error, please delete it from your system.
Do not use, copy or disclose the
information in any way nor act in reliance on it and notify the sender
immediately.
Please note that the BBC monitors e-mails
sent or received.
Further communication will signify your consent to
this.
-----------------------------

Re: Classloader memory leak on job restart (FlinkRunner)

Posted by Maximilian Michels <mx...@apache.org>.
I'm glad that solved your GC problem. I think dipose() is a good place, it is 
meant for cleanup.

In your case the DoFn is a NOOP, so the PipelineOptions are probably loaded 
through your UnboundedSource. If both happen to be scheduled in the same 
TaskManager that is fine. However, just for precaution we should also include 
the cache invalidation in UnboundedSourceWrapper.

This way we should be good for the streaming execution. Will try to get this 
into 2.10.0.

Thanks,
Max

Issue: https://jira.apache.org/jira/browse/BEAM-6460

On 17.01.19 12:50, Daniel Harper wrote:
> Max, Juan,
> 
> Just tried patching this class
> https://github.com/apache/beam/blob/v2.7.0/runners/flink/src/main/java/org/
> apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#
> L389 and putting the clearCache call in the finally block.
> 
> Redoing the test causes the GC to kick in (see screenshot)
> 
> I¹m not sure if this is the best place to put this clean up code though,
> is this the final place where all BEAM related stuff get terminated?
> 
> Daniel.
> 
> 
> 
> On 17/01/2019, 16:18, "Maximilian Michels" <mx...@apache.org> wrote:
> 
>> Hi Daniel, hi Juan,
>>
>> @Daniel Thanks a lot for investigating and reporting the issue.
>>
>> Your analysis looks convincing, it may be that Jackson is holding on to
>> the
>> Classloader. Beam uses Jackson to parse the FlinkPipelineOptions.
>>
>> Have you already tried to call TypeFactory.defaultInstance().clearCache()
>> in a
>> catch-all block within your synthetic Beam job, before actually failing?
>> That
>> way we could see if the classloader is garbage-collected after a restart.
>>
>> Let me also investigate in the meantime. We are in the progress of
>> getting the
>> 2.10.0 release ready with a few pending issues. So it would be a good
>> time to
>> fix this issue.
>>
>> Thanks,
>> Max
>>
>> On 17.01.19 09:50, Juan Carlos Garcia wrote:
>>> Nice finding, we are also experiencing the same (Flink 1.5.4)  where
>>> few jobs
>>> are dying of OOM for the metaspace as well after multiple restart, in
>>> our case
>>> we have
>>> a HA flink cluster and not using YARN for orchestration.
>>>
>>> Good job with the diagnosing .
>>>
>>> JC
>>>
>>> On Thu, Jan 17, 2019 at 3:23 PM Daniel Harper <Daniel.Harper@bbc.co.uk
>>> <ma...@bbc.co.uk>> wrote:
>>>
>>>      Environment:
>>>
>>>      BEAM 2.7.0
>>>      Flink 1.5.2
>>>      AWS EMR 5.17.0
>>>      Hadoop YARN for orchestration
>>>
>>>
>>>      We¹ve noticed the metaspace usage increasing when our Flink job
>>> restarts,
>>>      which in turn sometimes causes YARN to kill the container for going
>>> beyond
>>>      its physical memory limits. After setting the MaxMetaspaceSize
>>> setting and
>>>      making the JVM dump its heap on OOM, we noticed quite a few
>>> instances of the
>>>      FlinkUserClassLoader class hanging around, which corresponded with
>>> the
>>>      number of restarts that happened.
>>>
>>>      Originally I posted this issue on the FLINK mailing list here
>>>
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/User-
>>> ClassLoader-leak-on-job-restart-td25547.html
>>>
>>>
>>>
>>>      After investigation I think this is related to something in the
>>> BEAM code,
>>>      or the way BEAM interacts with the Flink class loading mechanism,
>>> because I
>>>      can see the following when selecting one of the Œold¹ classloaders
>>> -> Path
>>>      to GC Roots using Eclipse MAT in one of the heap dumps
>>>
>>>
>>>
>>>
>>>
>>>      This looks to me like this issue
>>>      https://github.com/FasterXML/jackson-databind/issues/1363
>>>
>>>
>>>      It sounds like to resolve it, user code should call
>>>      TypeFactory.defaultInstance().clearCache()when threads are
>>> shutdown. I¹m not
>>>      sure where in the FlinkRunner codebase this should be though
>>>
>>>
>>>      To try and narrow it down as much as possible/reduce the number of
>>>      dependencies I¹ve managed to reproduce this with a really really
>>> simple job
>>>      that just reads from a synthetic unbounded source (back-ported from
>>> the
>>>      master branch) and does nothing
>>> https://github.com/djhworld/streaming-job,
>>>      this will run on a Flink environment.
>>>
>>>      To reproduce the OOM I just ran the job with MaxMetaspaceSize=125M,
>>> and then
>>>      killed a random task manager every 60 seconds, which yielded the
>>> following
>>>
>>>
>>>
>>>      As you can see the number of classes increases on each restart,
>>> which causes
>>>      the metaspace to increase and eventually cause an OOM.
>>>
>>>      Is there anything we could do to fix this? I¹ve not tested this on
>>>> 2.7.0
>>>      because we are waiting for 2.10 to drop so we can run Flink 1.6/1.7
>>> on EMR
>>>
>>>      With thanks,
>>>
>>>      Daniel
>>>
>>>
>>>
>>>
>>>
>>>
>>>      ----------------------------
>>>
>>>      http://www.bbc.co.uk <http://www.bbc.co.uk>
>>>      This e-mail (and any attachments) is confidential and may contain
>>> personal
>>>      views which are not the views of the BBC unless specifically stated.
>>>      If you have received it in error, please delete it from your system.
>>>      Do not use, copy or disclose the information in any way nor act in
>>> reliance
>>>      on it and notify the sender immediately.
>>>      Please note that the BBC monitors e-mails sent or received.
>>>      Further communication will signify your consent to this.
>>>
>>>      ---------------------
>>>
>>>
>>>
>>> --
>>>
>>> JC
>>>
> 
> 
> 
> -----------------------------
> http://www.bbc.co.uk
> This e-mail (and any attachments) is confidential and
> may contain personal views which are not the views of the BBC unless specifically stated.
> If you have received it in
> error, please delete it from your system.
> Do not use, copy or disclose the
> information in any way nor act in reliance on it and notify the sender
> immediately.
> Please note that the BBC monitors e-mails
> sent or received.
> Further communication will signify your consent to
> this.
> -----------------------------
> 

Re: Classloader memory leak on job restart (FlinkRunner)

Posted by Daniel Harper <Da...@bbc.co.uk>.
Max, Juan,

Just tried patching this class
https://github.com/apache/beam/blob/v2.7.0/runners/flink/src/main/java/org/
apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#
L389 and putting the clearCache call in the finally block.

Redoing the test causes the GC to kick in (see screenshot)

I¹m not sure if this is the best place to put this clean up code though,
is this the final place where all BEAM related stuff get terminated?

Daniel.



On 17/01/2019, 16:18, "Maximilian Michels" <mx...@apache.org> wrote:

>Hi Daniel, hi Juan,
>
>@Daniel Thanks a lot for investigating and reporting the issue.
>
>Your analysis looks convincing, it may be that Jackson is holding on to
>the
>Classloader. Beam uses Jackson to parse the FlinkPipelineOptions.
>
>Have you already tried to call TypeFactory.defaultInstance().clearCache()
>in a
>catch-all block within your synthetic Beam job, before actually failing?
>That
>way we could see if the classloader is garbage-collected after a restart.
>
>Let me also investigate in the meantime. We are in the progress of
>getting the
>2.10.0 release ready with a few pending issues. So it would be a good
>time to
>fix this issue.
>
>Thanks,
>Max
>
>On 17.01.19 09:50, Juan Carlos Garcia wrote:
>> Nice finding, we are also experiencing the same (Flink 1.5.4)  where
>>few jobs
>> are dying of OOM for the metaspace as well after multiple restart, in
>>our case
>> we have
>> a HA flink cluster and not using YARN for orchestration.
>>
>> Good job with the diagnosing .
>>
>> JC
>>
>> On Thu, Jan 17, 2019 at 3:23 PM Daniel Harper <Daniel.Harper@bbc.co.uk
>> <ma...@bbc.co.uk>> wrote:
>>
>>     Environment:
>>
>>     BEAM 2.7.0
>>     Flink 1.5.2
>>     AWS EMR 5.17.0
>>     Hadoop YARN for orchestration
>>
>>
>>     We¹ve noticed the metaspace usage increasing when our Flink job
>>restarts,
>>     which in turn sometimes causes YARN to kill the container for going
>>beyond
>>     its physical memory limits. After setting the MaxMetaspaceSize
>>setting and
>>     making the JVM dump its heap on OOM, we noticed quite a few
>>instances of the
>>     FlinkUserClassLoader class hanging around, which corresponded with
>>the
>>     number of restarts that happened.
>>
>>     Originally I posted this issue on the FLINK mailing list here
>>
>>http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/User-
>>ClassLoader-leak-on-job-restart-td25547.html
>>
>>
>>
>>     After investigation I think this is related to something in the
>>BEAM code,
>>     or the way BEAM interacts with the Flink class loading mechanism,
>>because I
>>     can see the following when selecting one of the Œold¹ classloaders
>>-> Path
>>     to GC Roots using Eclipse MAT in one of the heap dumps
>>
>>
>>
>>
>>
>>     This looks to me like this issue
>>     https://github.com/FasterXML/jackson-databind/issues/1363
>>
>>
>>     It sounds like to resolve it, user code should call
>>     TypeFactory.defaultInstance().clearCache()when threads are
>>shutdown. I¹m not
>>     sure where in the FlinkRunner codebase this should be though
>>
>>
>>     To try and narrow it down as much as possible/reduce the number of
>>     dependencies I¹ve managed to reproduce this with a really really
>>simple job
>>     that just reads from a synthetic unbounded source (back-ported from
>>the
>>     master branch) and does nothing
>>https://github.com/djhworld/streaming-job,
>>     this will run on a Flink environment.
>>
>>     To reproduce the OOM I just ran the job with MaxMetaspaceSize=125M,
>>and then
>>     killed a random task manager every 60 seconds, which yielded the
>>following
>>
>>
>>
>>     As you can see the number of classes increases on each restart,
>>which causes
>>     the metaspace to increase and eventually cause an OOM.
>>
>>     Is there anything we could do to fix this? I¹ve not tested this on
>>> 2.7.0
>>     because we are waiting for 2.10 to drop so we can run Flink 1.6/1.7
>>on EMR
>>
>>     With thanks,
>>
>>     Daniel
>>
>>
>>
>>
>>
>>
>>     ----------------------------
>>
>>     http://www.bbc.co.uk <http://www.bbc.co.uk>
>>     This e-mail (and any attachments) is confidential and may contain
>>personal
>>     views which are not the views of the BBC unless specifically stated.
>>     If you have received it in error, please delete it from your system.
>>     Do not use, copy or disclose the information in any way nor act in
>>reliance
>>     on it and notify the sender immediately.
>>     Please note that the BBC monitors e-mails sent or received.
>>     Further communication will signify your consent to this.
>>
>>     ---------------------
>>
>>
>>
>> --
>>
>> JC
>>



-----------------------------
http://www.bbc.co.uk
This e-mail (and any attachments) is confidential and
may contain personal views which are not the views of the BBC unless specifically stated.
If you have received it in
error, please delete it from your system.
Do not use, copy or disclose the
information in any way nor act in reliance on it and notify the sender
immediately.
Please note that the BBC monitors e-mails
sent or received.
Further communication will signify your consent to
this.
-----------------------------

Re: Classloader memory leak on job restart (FlinkRunner)

Posted by Maximilian Michels <mx...@apache.org>.
Hi Daniel, hi Juan,

@Daniel Thanks a lot for investigating and reporting the issue.

Your analysis looks convincing, it may be that Jackson is holding on to the 
Classloader. Beam uses Jackson to parse the FlinkPipelineOptions.

Have you already tried to call TypeFactory.defaultInstance().clearCache() in a 
catch-all block within your synthetic Beam job, before actually failing? That 
way we could see if the classloader is garbage-collected after a restart.

Let me also investigate in the meantime. We are in the progress of getting the 
2.10.0 release ready with a few pending issues. So it would be a good time to 
fix this issue.

Thanks,
Max

On 17.01.19 09:50, Juan Carlos Garcia wrote:
> Nice finding, we are also experiencing the same (Flink 1.5.4)  where few jobs 
> are dying of OOM for the metaspace as well after multiple restart, in our case 
> we have
> a HA flink cluster and not using YARN for orchestration.
> 
> Good job with the diagnosing .
> 
> JC
> 
> On Thu, Jan 17, 2019 at 3:23 PM Daniel Harper <Daniel.Harper@bbc.co.uk 
> <ma...@bbc.co.uk>> wrote:
> 
>     Environment:
> 
>     BEAM 2.7.0
>     Flink 1.5.2
>     AWS EMR 5.17.0
>     Hadoop YARN for orchestration
> 
> 
>     We’ve noticed the metaspace usage increasing when our Flink job restarts,
>     which in turn sometimes causes YARN to kill the container for going beyond
>     its physical memory limits. After setting the MaxMetaspaceSize setting and
>     making the JVM dump its heap on OOM, we noticed quite a few instances of the
>     FlinkUserClassLoader class hanging around, which corresponded with the
>     number of restarts that happened.
> 
>     Originally I posted this issue on the FLINK mailing list here
>     http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/User-ClassLoader-leak-on-job-restart-td25547.html
> 
> 
> 
>     After investigation I think this is related to something in the BEAM code,
>     or the way BEAM interacts with the Flink class loading mechanism, because I
>     can see the following when selecting one of the ‘old’ classloaders -> Path
>     to GC Roots using Eclipse MAT in one of the heap dumps
> 
> 
> 
> 
> 
>     This looks to me like this issue
>     https://github.com/FasterXML/jackson-databind/issues/1363
> 
> 
>     It sounds like to resolve it, user code should call
>     TypeFactory.defaultInstance().clearCache()when threads are shutdown. I’m not
>     sure where in the FlinkRunner codebase this should be though
> 
> 
>     To try and narrow it down as much as possible/reduce the number of
>     dependencies I’ve managed to reproduce this with a really really simple job
>     that just reads from a synthetic unbounded source (back-ported from the
>     master branch) and does nothing https://github.com/djhworld/streaming-job,
>     this will run on a Flink environment.
> 
>     To reproduce the OOM I just ran the job with MaxMetaspaceSize=125M, and then
>     killed a random task manager every 60 seconds, which yielded the following
> 
> 
> 
>     As you can see the number of classes increases on each restart, which causes
>     the metaspace to increase and eventually cause an OOM.
> 
>     Is there anything we could do to fix this? I’ve not tested this on > 2.7.0
>     because we are waiting for 2.10 to drop so we can run Flink 1.6/1.7 on EMR
> 
>     With thanks,
> 
>     Daniel
> 
> 
> 
> 
> 
> 
>     ----------------------------
> 
>     http://www.bbc.co.uk <http://www.bbc.co.uk>
>     This e-mail (and any attachments) is confidential and may contain personal
>     views which are not the views of the BBC unless specifically stated.
>     If you have received it in error, please delete it from your system.
>     Do not use, copy or disclose the information in any way nor act in reliance
>     on it and notify the sender immediately.
>     Please note that the BBC monitors e-mails sent or received.
>     Further communication will signify your consent to this.
> 
>     ---------------------
> 
> 
> 
> -- 
> 
> JC
>