You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Edward Rojas <ed...@gmail.com> on 2018/05/24 16:21:20 UTC

Flink 1.5 - Job fails to execute in multiple taskmanagers (parallelism > 1)

Hi all,

I was testing Flink 1.5 rc5 and I found this issue. I'm running a cluster in
HA mode with one jobmanager, several taskmanagers, each one with two task
slots and default parallelism set to 2.

I'm running two jobs, one simple one with a kafka consumer, a filter and a
sink. The other a little bit more complex with a kafka consumer, filters,
flatmaps, keyed process functions and sinks. 

Both jobs run correctly when they are assigned to run in the 2 slots of the
same taskmanager.

But when one slot in in one taskmanager and the other in a different one,
the simpler job runs cor correctly but the complex one fails with the
following error:

org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
java.lang.UnsupportedOperationException: Heap buffer
	at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:170)
	at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
	at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
	at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
	at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
	at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
	at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
	at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
	at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
	at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:79)
	at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
	at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
	at
org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.exceptionCaught(SslHandler.java:697)
	at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
	at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:809)
	at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:341)
	at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
	at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
	at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
	at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
	at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.shaded.netty4.io.netty.handler.codec.DecoderException:
java.lang.UnsupportedOperationException: Heap buffer
	at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:346)
	at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:229)
	at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
	... 9 more
Caused by: java.lang.UnsupportedOperationException: Heap buffer
	at
org.apache.flink.runtime.io.network.netty.NettyBufferPool.heapBuffer(NettyBufferPool.java:236)
	at
org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:926)
	at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:315)
	... 11 more


After reading a note on the release notes.... I tested with taskmanagers
having one single task slot.
But I got the same result: the simple job works correctly and the complex
one fails with the Exception above.

Any thoughts ?

Regards,
Edward



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/

Re: Flink 1.5 - Job fails to execute in multiple taskmanagers (parallelism > 1)

Posted by Edward Rojas <ed...@gmail.com>.
I just tested the workaround and it works.
Thank you



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/

Re: Flink 1.5 - Job fails to execute in multiple taskmanagers (parallelism > 1)

Posted by Till Rohrmann <tr...@apache.org>.
FYI: https://issues.apache.org/jira/browse/FLINK-9437

We will revert the changes of FLINK-9310 with Flink 1.5.1. For Flink 1.6,
the problem should not arise since we are currently upgrading our Netty
dependency.

Cheers,
Till

On Fri, May 25, 2018 at 8:40 AM, Till Rohrmann <tr...@apache.org> wrote:

> Thanks for reporting the issue Edward.
>
> Taking a look at Netty SslHandler, it looks that we introduced this
> problem with the update of the cipher algorithms [1]. Apparently, the
> SslHandler wants to use inbound heap byte buffer when using a cipher suite
> with GCM enabled [2]. This seems to be fixed with a later version of Netty
> 4 (we are using 4.0.27 at the moment). The problem with the heap byte
> buffers are, that our NettyBufferPool does not support the allocation of
> heap byte buffers in order to have the memory consumption under control.
>
> As a work around, you could set `security.ssl.algorithms` to
> `TLS_RSA_WITH_AES_128_CBC_SHA` in the Flink configuration. That should make
> it work again at the cost of using a cipher which is no longer recommended.
>
> [1] https://issues.apache.org/jira/browse/FLINK-9310.
> [2] https://github.com/netty/netty/blob/netty-4.0.27.Final/
> handler/src/main/java/io/netty/handler/ssl/SslHandler.java#L1218
>
> Cheers,
> Till
>
> On Thu, May 24, 2018 at 8:49 PM, Edward Rojas <ed...@gmail.com>
> wrote:
>
>> This may help to target the issue:
>> If I let global ssl enabled, but I set taskmanager.data.ssl.enabled:
>> false,
>> it works again.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.
>> com/
>>
>
>

Re: Flink 1.5 - Job fails to execute in multiple taskmanagers (parallelism > 1)

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for reporting the issue Edward.

Taking a look at Netty SslHandler, it looks that we introduced this problem
with the update of the cipher algorithms [1]. Apparently, the SslHandler
wants to use inbound heap byte buffer when using a cipher suite with GCM
enabled [2]. This seems to be fixed with a later version of Netty 4 (we are
using 4.0.27 at the moment). The problem with the heap byte buffers are,
that our NettyBufferPool does not support the allocation of heap byte
buffers in order to have the memory consumption under control.

As a work around, you could set `security.ssl.algorithms` to
`TLS_RSA_WITH_AES_128_CBC_SHA` in the Flink configuration. That should make
it work again at the cost of using a cipher which is no longer recommended.

[1] https://issues.apache.org/jira/browse/FLINK-9310.
[2]
https://github.com/netty/netty/blob/netty-4.0.27.Final/handler/src/main/java/io/netty/handler/ssl/SslHandler.java#L1218

Cheers,
Till

On Thu, May 24, 2018 at 8:49 PM, Edward Rojas <ed...@gmail.com>
wrote:

> This may help to target the issue:
> If I let global ssl enabled, but I set taskmanager.data.ssl.enabled: false,
> it works again.
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>

Re: Flink 1.5 - Job fails to execute in multiple taskmanagers (parallelism > 1)

Posted by Edward Rojas <ed...@gmail.com>.
This may help to target the issue:
If I let global ssl enabled, but I set taskmanager.data.ssl.enabled: false,
it works again.



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/

Re: Flink 1.5 - Job fails to execute in multiple taskmanagers (parallelism > 1)

Posted by Edward Rojas <ed...@gmail.com>.
Regarding heap, the only configurations I do explicitly are
/`jobmanager.heap.mb`/, /`taskmanager.heap.mb`/ and
/`taskmanager.memory.preallocate: false`/. All other settings for memory
have their default value.

I just tested and it fails only when SSL is enabled.



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/

Re: Flink 1.5 - Job fails to execute in multiple taskmanagers (parallelism > 1)

Posted by Stephan Ewen <se...@apache.org>.
Two quick questions:

  - do you explicitly configure Flink memory onheap / offheap?
  - can you check whether this also happens when SSL is disabled?


On Thu, May 24, 2018 at 6:21 PM, Edward Rojas <ed...@gmail.com>
wrote:

> Hi all,
>
> I was testing Flink 1.5 rc5 and I found this issue. I'm running a cluster
> in
> HA mode with one jobmanager, several taskmanagers, each one with two task
> slots and default parallelism set to 2.
>
> I'm running two jobs, one simple one with a kafka consumer, a filter and a
> sink. The other a little bit more complex with a kafka consumer, filters,
> flatmaps, keyed process functions and sinks.
>
> Both jobs run correctly when they are assigned to run in the 2 slots of the
> same taskmanager.
>
> But when one slot in in one taskmanager and the other in a different one,
> the simpler job runs cor correctly but the complex one fails with the
> following error:
>
> org.apache.flink.runtime.io.network.netty.exception.
> LocalTransportException:
> java.lang.UnsupportedOperationException: Heap buffer
>         at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestCli
> entHandler.exceptionCaught(CreditBasedPartitionRequestCli
> entHandler.java:170)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.
> AbstractChannelHandlerContext.invokeExceptionCaught(
> AbstractChannelHandlerContext.java:275)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.
> AbstractChannelHandlerContext.fireExceptionCaught(
> AbstractChannelHandlerContext.java:253)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.
> ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.
> java:131)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.
> AbstractChannelHandlerContext.invokeExceptionCaught(
> AbstractChannelHandlerContext.java:275)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.
> AbstractChannelHandlerContext.fireExceptionCaught(
> AbstractChannelHandlerContext.java:253)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.
> ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.
> java:131)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.
> AbstractChannelHandlerContext.invokeExceptionCaught(
> AbstractChannelHandlerContext.java:275)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.
> AbstractChannelHandlerContext.fireExceptionCaught(
> AbstractChannelHandlerContext.java:253)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.
> exceptionCaught(ChannelHandlerAdapter.java:79)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.
> AbstractChannelHandlerContext.invokeExceptionCaught(
> AbstractChannelHandlerContext.java:275)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.
> AbstractChannelHandlerContext.fireExceptionCaught(
> AbstractChannelHandlerContext.java:253)
>         at
> org.apache.flink.shaded.netty4.io.netty.handler.ssl.
> SslHandler.exceptionCaught(SslHandler.java:697)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.
> AbstractChannelHandlerContext.invokeExceptionCaught(
> AbstractChannelHandlerContext.java:275)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.
> AbstractChannelHandlerContext.notifyHandlerException(
> AbstractChannelHandlerContext.java:809)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.
> AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:341)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.
> AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:324)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.
> fireChannelRead(DefaultChannelPipeline.java:847)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.
> AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.
> processSelectedKey(NioEventLoop.java:511)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.
> processSelectedKeysOptimized(NioEventLoop.java:468)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.
> processSelectedKeys(NioEventLoop.java:382)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.
> NioEventLoop.run(NioEventLoop.java:354)
>         at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.shaded.netty4.io.netty.handler.codec.DecoderException:
> java.lang.UnsupportedOperationException: Heap buffer
>         at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.
> ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:346)
>         at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.
> ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:229)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.
> AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:339)
>         ... 9 more
> Caused by: java.lang.UnsupportedOperationException: Heap buffer
>         at
> org.apache.flink.runtime.io.network.netty.NettyBufferPool.
> heapBuffer(NettyBufferPool.java:236)
>         at
> org.apache.flink.shaded.netty4.io.netty.handler.ssl.
> SslHandler.decode(SslHandler.java:926)
>         at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.
> ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:315)
>         ... 11 more
>
>
> After reading a note on the release notes.... I tested with taskmanagers
> having one single task slot.
> But I got the same result: the simple job works correctly and the complex
> one fails with the Exception above.
>
> Any thoughts ?
>
> Regards,
> Edward
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>