You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "LINZ, Arnaud" <AL...@bouyguestelecom.fr> on 2015/12/14 09:57:23 UTC

RE: Crash in a simple "mapper style" streaming app likely due to a memory leak ?

Hello,

I did have an off-heap memory leak in my streaming application, due to :
https://issues.apache.org/jira/browse/HADOOP-12007.

Now that I use the CodecPool to close that leak, I get under load the following error :

org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: java.lang.OutOfMemoryError: Direct buffer memory
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
    at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
    at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
    at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
    at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
    at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
    at java.lang.Thread.run(Thread.java:744)
Caused by: io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: Direct buffer memory
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    ... 9 more
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:658)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
    at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
    at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
    at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
    at io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
    at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
    at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
    at io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
    ... 10 more


But the JVM Heap is ok (monitored by JVisualVM) and the memory size of the JVM process is half what it was with the memory leak when Yarn killed the container.

Note that I have added a “PartitionBy” in my stream process before the sink and my app is no longer a simple “mapper style” app.

Do you known the cause of the error and how to correct it ?

Best regards,

Arnaud



De : LINZ, Arnaud
Envoyé : vendredi 13 novembre 2015 15:49
À : 'user@flink.apache.org' <us...@flink.apache.org>
Objet : RE: Crash in a simple "mapper style" streaming app likely due to a memory leak ?

Hi Robert,

Thanks, it works with 50% -- at least way past the previous crash point.

In my opinion (I lack real metrics), the part that uses the most memory is the M2 mapper, instantiated once per slot.
The most complex part is the Sink (it does use a lot of hdfs files, flushing threads etc.) ; but I expect the “RichSinkFunction” to be instantiated only once per slot ? I’m really surprised by that memory usage, I will try using a monitoring app on the yarn jvm to understand.

How do I set this yarn.heap-cutoff-ratio  parameter for a specific application ? I don’t want to modify the “root-protected” flink-conf.yaml for all the users & flink jobs with that value.

Regards,
Arnaud

De : Robert Metzger [mailto:rmetzger@apache.org]
Envoyé : vendredi 13 novembre 2015 15:16
À : user@flink.apache.org<ma...@flink.apache.org>
Objet : Re: Crash in a simple "mapper style" streaming app likely due to a memory leak ?

Hi Arnaud,

can you try running the job again with the configuration value of "yarn.heap-cutoff-ratio" set to 0.5.
As you can see, the container has been killed because it used more than 12 GB : "12.1 GB of 12 GB physical memory used;"
You can also see from the logs, that we limit the JVM Heap space to 9.2GB: "java -Xms9216m -Xmx9216m"

In an ideal world, we would tell the JVM to limit its memory usage to 12 GB, but sadly, the heap space is not the only memory the JVM is allocating. Its allocating direct memory, and other stuff outside. Therefore, we use only 75% of the container memory to the heap.
In your case, I assume that each JVM is having multiple HDFS clients, a lot of local threads etc.... that's why the memory might not suffice.
With a cutoff ratio of 0.5, we'll only use 6 GB for the heap.

That value might be a bit too high .. but I want to make sure that we first identify the issue.
If the job is running with 50% cutoff, you can try to reduce it again towards 25% (that's the default value, unlike the documentation says).

I hope that helps.

Regards,
Robert


On Fri, Nov 13, 2015 at 2:58 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr>> wrote:
Hello,

I use the brand new 0.10 version and I have problems running a streaming execution. My topology is linear : a custom source SC scans a directory and emits hdfs file names ; a first mapper M1 opens the file and emits its lines ; a filter F filters lines ; another mapper M2 transforms them ; and a mapper/sink M3->SK stores them in HDFS.

SC->M1->F->M2->M3->SK

The M2 transformer uses a bit of RAM because when it opens it loads a 11M row static table inside a hash map to enrich the lines. I use 55 slots on Yarn, using 11 containers of 12Gb x 5 slots

To my understanding, I should not have any memory problem since each record is independent : no join, no key, no aggregation, no window => it’s a simple flow mapper, with RAM simply used as a buffer. However, if I submit enough input data, I systematically crash my app with “Connection unexpectedly closed by remote task manager” exception, and the first error in YARN log shows that “a container is running beyond physical memory limits”.

If I increase the container size, I simply need to feed in more data to get the crash happen.

Any idea?

Greetings,
Arnaud

_________________________________
Exceptions in Flink dashboard detail :

Root Exception :
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'bt1shli6/172.21.125.31:33186<http://172.21.125.31:33186>'. This might indicate that the remote task manager was lost.
       at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:119)
(…)

________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

RE: Crash in a simple "mapper style" streaming app likely due to a memory leak ?

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hi,
I’ve just run into another exception, a java.lang.IndexOutOfBoundsException  in the zlib library this time.
Therefore I suspect a problem in the hadoop’s codec pool usage. I’m investigating, and will keep you informed.

Thanks,
Arnaud


De : ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] De la part de Stephan Ewen
Envoyé : lundi 14 décembre 2015 10:54
À : user@flink.apache.org
Objet : Re: Crash in a simple "mapper style" streaming app likely due to a memory leak ?

Hi!

That is curious. Can you tell us a bit more about your setup?

  - Did you set Flink to use off-heap memory in the config?
  - What parallelism do you run the job with?
  - What Java and Flink versions are you using?

Even better, can you paste the first part of the TaskManager's log (where it prints the environment) here?

Thanks,
Stephan


On Mon, Dec 14, 2015 at 9:57 AM, LINZ, Arnaud <AL...@bouyguestelecom.fr>> wrote:
Hello,

I did have an off-heap memory leak in my streaming application, due to :
https://issues.apache.org/jira/browse/HADOOP-12007.

Now that I use the CodecPool to close that leak, I get under load the following error :

org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: java.lang.OutOfMemoryError: Direct buffer memory
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
    at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
    at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
    at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
    at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
    at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
    at java.lang.Thread.run(Thread.java:744)
Caused by: io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: Direct buffer memory
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    ... 9 more
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:658)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
    at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
    at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
    at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
    at io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
    at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
    at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
    at io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
    ... 10 more


But the JVM Heap is ok (monitored by JVisualVM) and the memory size of the JVM process is half what it was with the memory leak when Yarn killed the container.

Note that I have added a “PartitionBy” in my stream process before the sink and my app is no longer a simple “mapper style” app.

Do you known the cause of the error and how to correct it ?

Best regards,

Arnaud



De : LINZ, Arnaud
Envoyé : vendredi 13 novembre 2015 15:49
À : 'user@flink.apache.org<ma...@flink.apache.org>' <us...@flink.apache.org>>
Objet : RE: Crash in a simple "mapper style" streaming app likely due to a memory leak ?

Hi Robert,

Thanks, it works with 50% -- at least way past the previous crash point.

In my opinion (I lack real metrics), the part that uses the most memory is the M2 mapper, instantiated once per slot.
The most complex part is the Sink (it does use a lot of hdfs files, flushing threads etc.) ; but I expect the “RichSinkFunction” to be instantiated only once per slot ? I’m really surprised by that memory usage, I will try using a monitoring app on the yarn jvm to understand.

How do I set this yarn.heap-cutoff-ratio  parameter for a specific application ? I don’t want to modify the “root-protected” flink-conf.yaml for all the users & flink jobs with that value.

Regards,
Arnaud

De : Robert Metzger [mailto:rmetzger@apache.org]
Envoyé : vendredi 13 novembre 2015 15:16
À : user@flink.apache.org<ma...@flink.apache.org>
Objet : Re: Crash in a simple "mapper style" streaming app likely due to a memory leak ?

Hi Arnaud,

can you try running the job again with the configuration value of "yarn.heap-cutoff-ratio" set to 0.5.
As you can see, the container has been killed because it used more than 12 GB : "12.1 GB of 12 GB physical memory used;"
You can also see from the logs, that we limit the JVM Heap space to 9.2GB: "java -Xms9216m -Xmx9216m"

In an ideal world, we would tell the JVM to limit its memory usage to 12 GB, but sadly, the heap space is not the only memory the JVM is allocating. Its allocating direct memory, and other stuff outside. Therefore, we use only 75% of the container memory to the heap.
In your case, I assume that each JVM is having multiple HDFS clients, a lot of local threads etc.... that's why the memory might not suffice.
With a cutoff ratio of 0.5, we'll only use 6 GB for the heap.

That value might be a bit too high .. but I want to make sure that we first identify the issue.
If the job is running with 50% cutoff, you can try to reduce it again towards 25% (that's the default value, unlike the documentation says).

I hope that helps.

Regards,
Robert


On Fri, Nov 13, 2015 at 2:58 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr>> wrote:
Hello,

I use the brand new 0.10 version and I have problems running a streaming execution. My topology is linear : a custom source SC scans a directory and emits hdfs file names ; a first mapper M1 opens the file and emits its lines ; a filter F filters lines ; another mapper M2 transforms them ; and a mapper/sink M3->SK stores them in HDFS.

SC->M1->F->M2->M3->SK

The M2 transformer uses a bit of RAM because when it opens it loads a 11M row static table inside a hash map to enrich the lines. I use 55 slots on Yarn, using 11 containers of 12Gb x 5 slots

To my understanding, I should not have any memory problem since each record is independent : no join, no key, no aggregation, no window => it’s a simple flow mapper, with RAM simply used as a buffer. However, if I submit enough input data, I systematically crash my app with “Connection unexpectedly closed by remote task manager” exception, and the first error in YARN log shows that “a container is running beyond physical memory limits”.

If I increase the container size, I simply need to feed in more data to get the crash happen.

Any idea?

Greetings,
Arnaud

_________________________________
Exceptions in Flink dashboard detail :

Root Exception :
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'bt1shli6/172.21.125.31:33186<http://172.21.125.31:33186>'. This might indicate that the remote task manager was lost.
       at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:119)
(…)

________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.


Re: Crash in a simple "mapper style" streaming app likely due to a memory leak ?

Posted by Stephan Ewen <se...@apache.org>.
Hi!

That is curious. Can you tell us a bit more about your setup?

  - Did you set Flink to use off-heap memory in the config?
  - What parallelism do you run the job with?
  - What Java and Flink versions are you using?

Even better, can you paste the first part of the TaskManager's log (where
it prints the environment) here?

Thanks,
Stephan


On Mon, Dec 14, 2015 at 9:57 AM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
wrote:

> Hello,
>
>
>
> I did have an off-heap memory leak in my streaming application, due to :
>
> https://issues.apache.org/jira/browse/HADOOP-12007.
>
>
>
> Now that I use the CodecPool to close that leak, I get under load the
> following error :
>
>
>
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> java.lang.OutOfMemoryError: Direct buffer memory
>
>     at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>
>     at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>
>     at
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>
>     at
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>
>     at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>
>     at
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>
>     at
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>
>     at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>
>     at
> io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>
>     at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>
>     at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>
>     at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>
>     at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>
>     at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>
>     at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>
>     at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>
>     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>
>     at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>
>     at java.lang.Thread.run(Thread.java:744)
>
> Caused by: io.netty.handler.codec.DecoderException:
> java.lang.OutOfMemoryError: Direct buffer memory
>
>     at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>
>     at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>
>     ... 9 more
>
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>
>     at java.nio.Bits.reserveMemory(Bits.java:658)
>
>     at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>
>     at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>
>     at
> io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
>
>     at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
>
>     at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
>
>     at io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
>
>     at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
>
>     at
> io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>
>     at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>
>     at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>
>     at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>
>     at
> io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>
>     at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>
>     ... 10 more
>
>
>
>
>
> But the JVM Heap is ok (monitored by JVisualVM) and the memory size of the
> JVM process is half what it was with the memory leak when Yarn killed the
> container.
>
>
>
> Note that I have added a “PartitionBy” in my stream process before the
> sink and my app is no longer a simple “mapper style” app.
>
>
>
> Do you known the cause of the error and how to correct it ?
>
>
>
> Best regards,
>
>
>
> Arnaud
>
>
>
>
>
>
>
> *De :* LINZ, Arnaud
> *Envoyé :* vendredi 13 novembre 2015 15:49
> *À :* 'user@flink.apache.org' <us...@flink.apache.org>
> *Objet :* RE: Crash in a simple "mapper style" streaming app likely due
> to a memory leak ?
>
>
>
> Hi Robert,
>
>
>
> Thanks, it works with 50% -- at least way past the previous crash point.
>
>
>
> In my opinion (I lack real metrics), the part that uses the most memory is
> the M2 mapper, instantiated once per slot.
>
> The most complex part is the Sink (it does use a lot of hdfs files,
> flushing threads etc.) ; but I expect the “RichSinkFunction” to be
> instantiated only once per slot ? I’m really surprised by that memory
> usage, I will try using a monitoring app on the yarn jvm to understand.
>
>
>
> How do I set this yarn.heap-cutoff-ratio  parameter for a specific
> application ? I don’t want to modify the “root-protected” flink-conf.yaml
> for all the users & flink jobs with that value.
>
>
>
> Regards,
>
> Arnaud
>
>
>
> *De :* Robert Metzger [mailto:rmetzger@apache.org <rm...@apache.org>]
> *Envoyé :* vendredi 13 novembre 2015 15:16
> *À :* user@flink.apache.org
> *Objet :* Re: Crash in a simple "mapper style" streaming app likely due
> to a memory leak ?
>
>
>
> Hi Arnaud,
>
>
>
> can you try running the job again with the configuration value
> of "yarn.heap-cutoff-ratio" set to 0.5.
>
> As you can see, the container has been killed because it used more than 12
> GB : "12.1 GB of 12 GB physical memory used;"
> You can also see from the logs, that we limit the JVM Heap space to 9.2GB:
> "java -Xms9216m -Xmx9216m"
>
>
>
> In an ideal world, we would tell the JVM to limit its memory usage to 12
> GB, but sadly, the heap space is not the only memory the JVM is allocating.
> Its allocating direct memory, and other stuff outside. Therefore, we use
> only 75% of the container memory to the heap.
>
> In your case, I assume that each JVM is having multiple HDFS clients, a
> lot of local threads etc.... that's why the memory might not suffice.
>
> With a cutoff ratio of 0.5, we'll only use 6 GB for the heap.
>
>
>
> That value might be a bit too high .. but I want to make sure that we
> first identify the issue.
>
> If the job is running with 50% cutoff, you can try to reduce it again
> towards 25% (that's the default value, unlike the documentation says).
>
>
>
> I hope that helps.
>
>
>
> Regards,
>
> Robert
>
>
>
>
>
> On Fri, Nov 13, 2015 at 2:58 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
> wrote:
>
> Hello,
>
>
>
> I use the brand new 0.10 version and I have problems running a streaming
> execution. My topology is linear : a custom source SC scans a directory and
> emits hdfs file names ; a first mapper M1 opens the file and emits its
> lines ; a filter F filters lines ; another mapper M2 transforms them ; and
> a mapper/sink M3->SK stores them in HDFS.
>
>
>
> SC->M1->F->M2->M3->SK
>
>
>
> The M2 transformer uses a bit of RAM because when it opens it loads a 11M
> row static table inside a hash map to enrich the lines. I use 55 slots on
> Yarn, using 11 containers of 12Gb x 5 slots
>
>
>
> To my understanding, I should not have any memory problem since each
> record is independent : no join, no key, no aggregation, no window => it’s
> a simple flow mapper, with RAM simply used as a buffer. However, if I
> submit enough input data, I systematically crash my app with “Connection
> unexpectedly closed by remote task manager” exception, and the first error
> in YARN log shows that “a container is running beyond physical memory
> limits”.
>
>
>
> If I increase the container size, I simply need to feed in more data to
> get the crash happen.
>
>
>
> Any idea?
>
>
>
> Greetings,
>
> Arnaud
>
>
>
> _________________________________
>
> Exceptions in Flink dashboard detail :
>
>
>
> Root Exception :
>
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager 'bt1shli6/
> 172.21.125.31:33186'. This might indicate that the remote task manager
> was lost.
>
>        at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:119)
>
> (…)
>
>
> ------------------------------
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>