You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 史 正超 <sh...@outlook.com> on 2020/11/16 09:53:33 UTC

flink-1.11.2 的 内存溢出问题

使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。

2020-11-16 17:44:52
java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...
    at java.nio.Bits.reserveMemory(Bits.java:658)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
    at org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
    at org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
    at org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
    at org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
    at org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
    at org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
    at org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
    at org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
    at org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
    at org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
    at org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)



Re: 回复: flink-1.11.2 的 内存溢出问题

Posted by Xintong Song <to...@gmail.com>.
理论上一个 TM 可以拆分成多少 slot 并没有硬性的限制,但是并不是说并发越大,性能就一定越好。
增大并发,会增加作业对内存的需求。TM 上的 slot 数量过多时,可能会造成 GC 压力大、网络内存不足、OOM 等情况。另外,同一个 TM 上的
slot 多了,运行的 task 多了,也会给框架造成一定的压力。
建议先观察一下  TM 的 cpu 使用情况,如果作业确实存在处理性能不足(延迟增大、存在反压)同时 TM container 的 cpu
(多核)利用率上不去,再考虑调大并发。

Thank you~

Xintong Song



On Tue, Nov 17, 2020 at 10:43 AM 史 正超 <sh...@outlook.com> wrote:

> 谢谢 Xintong
> 大神回复,看了你很多视频。顺便请教个问题,slot的内存有最小的限制吗?我想用有限的资源情况下,把taskmanager的内存slot拆分成最小,以此来达到最大并发。这种拆分有没有一个合理的范围。
> 比如 1 个TM,8G, 那它拆分的最小slot数量 有没有一个限制。
> ________________________________
> 发件人: Xintong Song <to...@gmail.com>
> 发送时间: 2020年11月17日 1:53
> 收件人: user-zh <us...@flink.apache.org>
> 主题: Re: 回复: flink-1.11.2 的 内存溢出问题
>
> >
> > 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过 下面代码动态设置吗?
> >
> > streamTableEnv.getConfig().getConfiguration().setString(key, value);
> >
>
> 不可以的,这个是集群配置。
>
> 可以通过 flink-conf.yaml 配置文件进行配置,或者在提交作业时通过 -yD key=value 的方式动态指定。
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Nov 17, 2020 at 9:31 AM Andrew <87...@qq.com> wrote:
>
> > 应该是不可以这样配置的, 通过配置文件;
> > taskmanager.memory.task.off-heap.size 参数属于taskmanager启动参数;
> >
> >
> > streamTableEnv.getConfig().getConfiguration().setString(key, value);
> > 这种属于任务运行时配置!
> >
> >
> >
> > ------------------&nbsp;原始邮件&nbsp;------------------
> > 发件人:
> >                                                   "user-zh"
> >                                                                     <
> > shizhengchao@outlook.com&gt;;
> > 发送时间:&nbsp;2020年11月16日(星期一) 晚上7:14
> > 收件人:&nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&gt;;
> >
> > 主题:&nbsp;回复: flink-1.11.2 的 内存溢出问题
> >
> >
> >
> > 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size&nbsp; 这个参数可以通过
> > 下面代码动态设置吗?
> >
> > streamTableEnv.getConfig().getConfiguration().setString(key, value);
> >
> > ________________________________
> > 发件人: Xintong Song <tonysong820@gmail.com&gt;
> > 发送时间: 2020年11月16日 10:59
> > 收件人: user-zh <user-zh@flink.apache.org&gt;
> > 主题: Re: flink-1.11.2 的 内存溢出问题
> >
> > 那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
> > 可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
> > 只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Mon, Nov 16, 2020 at 6:38 PM 史 正超 <shizhengchao@outlook.com&gt;
> wrote:
> >
> > &gt; flink-on-yarn . per-job模式,重启是kafka的group.id
> > &gt; 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
> > &gt; ________________________________
> > &gt; 发件人: Xintong Song <tonysong820@gmail.com&gt;
> > &gt; 发送时间: 2020年11月16日 10:11
> > &gt; 收件人: user-zh <user-zh@flink.apache.org&gt;
> > &gt; 主题: Re: flink-1.11.2 的 内存溢出问题
> > &gt;
> > &gt; 是什么部署模式呢?standalone?
> > &gt; 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
> > &gt;
> > &gt; Thank you~
> > &gt;
> > &gt; Xintong Song
> > &gt;
> > &gt;
> > &gt;
> > &gt; On Mon, Nov 16, 2020 at 5:53 PM 史 正超 <shizhengchao@outlook.com&gt;
> > wrote:
> > &gt;
> > &gt; &gt; 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> > &gt; &gt;
> > 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
> > &gt; &gt;
> > &gt; &gt; 2020-11-16 17:44:52
> > &gt; &gt; java.lang.OutOfMemoryError: Direct buffer memory. The direct
> > &gt; out-of-memory
> > &gt; &gt; error has occurred. This can mean two things: either job(s)
> > require(s) a
> > &gt; &gt; larger size of JVM direct memory or there is a direct memory
> > leak. The
> > &gt; &gt; direct memory can be allocated by user code or some of its
> > dependencies.
> > &gt; In
> > &gt; &gt; this case 'taskmanager.memory.task.off-heap.size' configuration
> > option
> > &gt; &gt; should be increased. Flink framework and its dependencies also
> > consume
> > &gt; the
> > &gt; &gt; direct memory, mostly for network communication. The most of
> > network
> > &gt; memory
> > &gt; &gt; is managed by Flink and should not result in out-of-memory
> > error. In
> > &gt; &gt; certain special cases, in particular for jobs with high
> > parallelism, the
> > &gt; &gt; framework may require more direct memory which is not managed
> by
> > Flink.
> > &gt; In
> > &gt; &gt; this case 'taskmanager.memory.framework.off-heap.size'
> > configuration
> > &gt; option
> > &gt; &gt; should be increased. If the error persists then there is
> > probably a
> > &gt; direct
> > &gt; &gt; memory leak in user code or some of its dependencies which has
> > to be
> > &gt; &gt; investigated and fixed. The task executor has to be shutdown...
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > java.nio.Bits.reserveMemory(Bits.java:658)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > java.nio.DirectByteBuffer.<init&gt;(DirectByteBuffer.java:123)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > sun.nio.ch.IOUtil.read(IOUtil.java:195)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
> > &gt; &gt;
> > &gt; &gt;
> > &gt; &gt;
> > &gt;
>

回复: 回复: flink-1.11.2 的 内存溢出问题

Posted by 史 正超 <sh...@outlook.com>.
谢谢 Xintong 大神回复,看了你很多视频。顺便请教个问题,slot的内存有最小的限制吗?我想用有限的资源情况下,把taskmanager的内存slot拆分成最小,以此来达到最大并发。这种拆分有没有一个合理的范围。
比如 1 个TM,8G, 那它拆分的最小slot数量 有没有一个限制。
________________________________
发件人: Xintong Song <to...@gmail.com>
发送时间: 2020年11月17日 1:53
收件人: user-zh <us...@flink.apache.org>
主题: Re: 回复: flink-1.11.2 的 内存溢出问题

>
> 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过 下面代码动态设置吗?
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
>

不可以的,这个是集群配置。

可以通过 flink-conf.yaml 配置文件进行配置,或者在提交作业时通过 -yD key=value 的方式动态指定。


Thank you~

Xintong Song



On Tue, Nov 17, 2020 at 9:31 AM Andrew <87...@qq.com> wrote:

> 应该是不可以这样配置的, 通过配置文件;
> taskmanager.memory.task.off-heap.size 参数属于taskmanager启动参数;
>
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
> 这种属于任务运行时配置!
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> shizhengchao@outlook.com&gt;;
> 发送时间:&nbsp;2020年11月16日(星期一) 晚上7:14
> 收件人:&nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;回复: flink-1.11.2 的 内存溢出问题
>
>
>
> 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size&nbsp; 这个参数可以通过
> 下面代码动态设置吗?
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
>
> ________________________________
> 发件人: Xintong Song <tonysong820@gmail.com&gt;
> 发送时间: 2020年11月16日 10:59
> 收件人: user-zh <user-zh@flink.apache.org&gt;
> 主题: Re: flink-1.11.2 的 内存溢出问题
>
> 那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
> 可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
> 只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Nov 16, 2020 at 6:38 PM 史 正超 <shizhengchao@outlook.com&gt; wrote:
>
> &gt; flink-on-yarn . per-job模式,重启是kafka的group.id
> &gt; 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
> &gt; ________________________________
> &gt; 发件人: Xintong Song <tonysong820@gmail.com&gt;
> &gt; 发送时间: 2020年11月16日 10:11
> &gt; 收件人: user-zh <user-zh@flink.apache.org&gt;
> &gt; 主题: Re: flink-1.11.2 的 内存溢出问题
> &gt;
> &gt; 是什么部署模式呢?standalone?
> &gt; 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
> &gt;
> &gt; Thank you~
> &gt;
> &gt; Xintong Song
> &gt;
> &gt;
> &gt;
> &gt; On Mon, Nov 16, 2020 at 5:53 PM 史 正超 <shizhengchao@outlook.com&gt;
> wrote:
> &gt;
> &gt; &gt; 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> &gt; &gt;
> 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
> &gt; &gt;
> &gt; &gt; 2020-11-16 17:44:52
> &gt; &gt; java.lang.OutOfMemoryError: Direct buffer memory. The direct
> &gt; out-of-memory
> &gt; &gt; error has occurred. This can mean two things: either job(s)
> require(s) a
> &gt; &gt; larger size of JVM direct memory or there is a direct memory
> leak. The
> &gt; &gt; direct memory can be allocated by user code or some of its
> dependencies.
> &gt; In
> &gt; &gt; this case 'taskmanager.memory.task.off-heap.size' configuration
> option
> &gt; &gt; should be increased. Flink framework and its dependencies also
> consume
> &gt; the
> &gt; &gt; direct memory, mostly for network communication. The most of
> network
> &gt; memory
> &gt; &gt; is managed by Flink and should not result in out-of-memory
> error. In
> &gt; &gt; certain special cases, in particular for jobs with high
> parallelism, the
> &gt; &gt; framework may require more direct memory which is not managed by
> Flink.
> &gt; In
> &gt; &gt; this case 'taskmanager.memory.framework.off-heap.size'
> configuration
> &gt; option
> &gt; &gt; should be increased. If the error persists then there is
> probably a
> &gt; direct
> &gt; &gt; memory leak in user code or some of its dependencies which has
> to be
> &gt; &gt; investigated and fixed. The task executor has to be shutdown...
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.nio.Bits.reserveMemory(Bits.java:658)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.nio.DirectByteBuffer.<init&gt;(DirectByteBuffer.java:123)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> sun.nio.ch.IOUtil.read(IOUtil.java:195)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt;

Re: 回复: flink-1.11.2 的 内存溢出问题

Posted by Xintong Song <to...@gmail.com>.
>
> 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过 下面代码动态设置吗?
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
>

不可以的,这个是集群配置。

可以通过 flink-conf.yaml 配置文件进行配置,或者在提交作业时通过 -yD key=value 的方式动态指定。


Thank you~

Xintong Song



On Tue, Nov 17, 2020 at 9:31 AM Andrew <87...@qq.com> wrote:

> 应该是不可以这样配置的, 通过配置文件;
> taskmanager.memory.task.off-heap.size 参数属于taskmanager启动参数;
>
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
> 这种属于任务运行时配置!
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> shizhengchao@outlook.com&gt;;
> 发送时间:&nbsp;2020年11月16日(星期一) 晚上7:14
> 收件人:&nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;回复: flink-1.11.2 的 内存溢出问题
>
>
>
> 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size&nbsp; 这个参数可以通过
> 下面代码动态设置吗?
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
>
> ________________________________
> 发件人: Xintong Song <tonysong820@gmail.com&gt;
> 发送时间: 2020年11月16日 10:59
> 收件人: user-zh <user-zh@flink.apache.org&gt;
> 主题: Re: flink-1.11.2 的 内存溢出问题
>
> 那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
> 可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
> 只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Nov 16, 2020 at 6:38 PM 史 正超 <shizhengchao@outlook.com&gt; wrote:
>
> &gt; flink-on-yarn . per-job模式,重启是kafka的group.id
> &gt; 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
> &gt; ________________________________
> &gt; 发件人: Xintong Song <tonysong820@gmail.com&gt;
> &gt; 发送时间: 2020年11月16日 10:11
> &gt; 收件人: user-zh <user-zh@flink.apache.org&gt;
> &gt; 主题: Re: flink-1.11.2 的 内存溢出问题
> &gt;
> &gt; 是什么部署模式呢?standalone?
> &gt; 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
> &gt;
> &gt; Thank you~
> &gt;
> &gt; Xintong Song
> &gt;
> &gt;
> &gt;
> &gt; On Mon, Nov 16, 2020 at 5:53 PM 史 正超 <shizhengchao@outlook.com&gt;
> wrote:
> &gt;
> &gt; &gt; 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> &gt; &gt;
> 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
> &gt; &gt;
> &gt; &gt; 2020-11-16 17:44:52
> &gt; &gt; java.lang.OutOfMemoryError: Direct buffer memory. The direct
> &gt; out-of-memory
> &gt; &gt; error has occurred. This can mean two things: either job(s)
> require(s) a
> &gt; &gt; larger size of JVM direct memory or there is a direct memory
> leak. The
> &gt; &gt; direct memory can be allocated by user code or some of its
> dependencies.
> &gt; In
> &gt; &gt; this case 'taskmanager.memory.task.off-heap.size' configuration
> option
> &gt; &gt; should be increased. Flink framework and its dependencies also
> consume
> &gt; the
> &gt; &gt; direct memory, mostly for network communication. The most of
> network
> &gt; memory
> &gt; &gt; is managed by Flink and should not result in out-of-memory
> error. In
> &gt; &gt; certain special cases, in particular for jobs with high
> parallelism, the
> &gt; &gt; framework may require more direct memory which is not managed by
> Flink.
> &gt; In
> &gt; &gt; this case 'taskmanager.memory.framework.off-heap.size'
> configuration
> &gt; option
> &gt; &gt; should be increased. If the error persists then there is
> probably a
> &gt; direct
> &gt; &gt; memory leak in user code or some of its dependencies which has
> to be
> &gt; &gt; investigated and fixed. The task executor has to be shutdown...
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.nio.Bits.reserveMemory(Bits.java:658)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.nio.DirectByteBuffer.<init&gt;(DirectByteBuffer.java:123)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> sun.nio.ch.IOUtil.read(IOUtil.java:195)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt;

回复:回复: flink-1.11.2 的 内存溢出问题

Posted by Andrew <87...@qq.com>.
应该是不可以这样配置的, 通过配置文件;
taskmanager.memory.task.off-heap.size 参数属于taskmanager启动参数;


streamTableEnv.getConfig().getConfiguration().setString(key, value); 这种属于任务运行时配置!



------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <shizhengchao@outlook.com&gt;;
发送时间:&nbsp;2020年11月16日(星期一) 晚上7:14
收件人:&nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&gt;;

主题:&nbsp;回复: flink-1.11.2 的 内存溢出问题



好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size&nbsp; 这个参数可以通过 下面代码动态设置吗?

streamTableEnv.getConfig().getConfiguration().setString(key, value);

________________________________
发件人: Xintong Song <tonysong820@gmail.com&gt;
发送时间: 2020年11月16日 10:59
收件人: user-zh <user-zh@flink.apache.org&gt;
主题: Re: flink-1.11.2 的 内存溢出问题

那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。

Thank you~

Xintong Song



On Mon, Nov 16, 2020 at 6:38 PM 史 正超 <shizhengchao@outlook.com&gt; wrote:

&gt; flink-on-yarn . per-job模式,重启是kafka的group.id
&gt; 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
&gt; ________________________________
&gt; 发件人: Xintong Song <tonysong820@gmail.com&gt;
&gt; 发送时间: 2020年11月16日 10:11
&gt; 收件人: user-zh <user-zh@flink.apache.org&gt;
&gt; 主题: Re: flink-1.11.2 的 内存溢出问题
&gt;
&gt; 是什么部署模式呢?standalone?
&gt; 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
&gt;
&gt; Thank you~
&gt;
&gt; Xintong Song
&gt;
&gt;
&gt;
&gt; On Mon, Nov 16, 2020 at 5:53 PM 史 正超 <shizhengchao@outlook.com&gt; wrote:
&gt;
&gt; &gt; 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
&gt; &gt; 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
&gt; &gt;
&gt; &gt; 2020-11-16 17:44:52
&gt; &gt; java.lang.OutOfMemoryError: Direct buffer memory. The direct
&gt; out-of-memory
&gt; &gt; error has occurred. This can mean two things: either job(s) require(s) a
&gt; &gt; larger size of JVM direct memory or there is a direct memory leak. The
&gt; &gt; direct memory can be allocated by user code or some of its dependencies.
&gt; In
&gt; &gt; this case 'taskmanager.memory.task.off-heap.size' configuration option
&gt; &gt; should be increased. Flink framework and its dependencies also consume
&gt; the
&gt; &gt; direct memory, mostly for network communication. The most of network
&gt; memory
&gt; &gt; is managed by Flink and should not result in out-of-memory error. In
&gt; &gt; certain special cases, in particular for jobs with high parallelism, the
&gt; &gt; framework may require more direct memory which is not managed by Flink.
&gt; In
&gt; &gt; this case 'taskmanager.memory.framework.off-heap.size' configuration
&gt; option
&gt; &gt; should be increased. If the error persists then there is probably a
&gt; direct
&gt; &gt; memory leak in user code or some of its dependencies which has to be
&gt; &gt; investigated and fixed. The task executor has to be shutdown...
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at java.nio.Bits.reserveMemory(Bits.java:658)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at java.nio.DirectByteBuffer.<init&gt;(DirectByteBuffer.java:123)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at sun.nio.ch.IOUtil.read(IOUtil.java:195)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt;

回复: flink-1.11.2 的 内存溢出问题

Posted by 史 正超 <sh...@outlook.com>.
好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过 下面代码动态设置吗?

streamTableEnv.getConfig().getConfiguration().setString(key, value);

________________________________
发件人: Xintong Song <to...@gmail.com>
发送时间: 2020年11月16日 10:59
收件人: user-zh <us...@flink.apache.org>
主题: Re: flink-1.11.2 的 内存溢出问题

那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。

Thank you~

Xintong Song



On Mon, Nov 16, 2020 at 6:38 PM 史 正超 <sh...@outlook.com> wrote:

> flink-on-yarn . per-job模式,重启是kafka的group.id
> 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
> ________________________________
> 发件人: Xintong Song <to...@gmail.com>
> 发送时间: 2020年11月16日 10:11
> 收件人: user-zh <us...@flink.apache.org>
> 主题: Re: flink-1.11.2 的 内存溢出问题
>
> 是什么部署模式呢?standalone?
> 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Nov 16, 2020 at 5:53 PM 史 正超 <sh...@outlook.com> wrote:
>
> > 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> > 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
> >
> > 2020-11-16 17:44:52
> > java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory
> > error has occurred. This can mean two things: either job(s) require(s) a
> > larger size of JVM direct memory or there is a direct memory leak. The
> > direct memory can be allocated by user code or some of its dependencies.
> In
> > this case 'taskmanager.memory.task.off-heap.size' configuration option
> > should be increased. Flink framework and its dependencies also consume
> the
> > direct memory, mostly for network communication. The most of network
> memory
> > is managed by Flink and should not result in out-of-memory error. In
> > certain special cases, in particular for jobs with high parallelism, the
> > framework may require more direct memory which is not managed by Flink.
> In
> > this case 'taskmanager.memory.framework.off-heap.size' configuration
> option
> > should be increased. If the error persists then there is probably a
> direct
> > memory leak in user code or some of its dependencies which has to be
> > investigated and fixed. The task executor has to be shutdown...
> >     at java.nio.Bits.reserveMemory(Bits.java:658)
> >     at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> >     at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> >     at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> >     at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> >     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> >     at
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
> >     at
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
> >
> >
> >
>

Re: flink-1.11.2 的 内存溢出问题

Posted by Xintong Song <to...@gmail.com>.
那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。

Thank you~

Xintong Song



On Mon, Nov 16, 2020 at 6:38 PM 史 正超 <sh...@outlook.com> wrote:

> flink-on-yarn . per-job模式,重启是kafka的group.id
> 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
> ________________________________
> 发件人: Xintong Song <to...@gmail.com>
> 发送时间: 2020年11月16日 10:11
> 收件人: user-zh <us...@flink.apache.org>
> 主题: Re: flink-1.11.2 的 内存溢出问题
>
> 是什么部署模式呢?standalone?
> 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Nov 16, 2020 at 5:53 PM 史 正超 <sh...@outlook.com> wrote:
>
> > 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> > 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
> >
> > 2020-11-16 17:44:52
> > java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory
> > error has occurred. This can mean two things: either job(s) require(s) a
> > larger size of JVM direct memory or there is a direct memory leak. The
> > direct memory can be allocated by user code or some of its dependencies.
> In
> > this case 'taskmanager.memory.task.off-heap.size' configuration option
> > should be increased. Flink framework and its dependencies also consume
> the
> > direct memory, mostly for network communication. The most of network
> memory
> > is managed by Flink and should not result in out-of-memory error. In
> > certain special cases, in particular for jobs with high parallelism, the
> > framework may require more direct memory which is not managed by Flink.
> In
> > this case 'taskmanager.memory.framework.off-heap.size' configuration
> option
> > should be increased. If the error persists then there is probably a
> direct
> > memory leak in user code or some of its dependencies which has to be
> > investigated and fixed. The task executor has to be shutdown...
> >     at java.nio.Bits.reserveMemory(Bits.java:658)
> >     at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> >     at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> >     at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> >     at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> >     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> >     at
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
> >     at
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
> >
> >
> >
>

回复: flink-1.11.2 的 内存溢出问题

Posted by 史 正超 <sh...@outlook.com>.
flink-on-yarn . per-job模式,重启是kafka的group.id没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
________________________________
发件人: Xintong Song <to...@gmail.com>
发送时间: 2020年11月16日 10:11
收件人: user-zh <us...@flink.apache.org>
主题: Re: flink-1.11.2 的 内存溢出问题

是什么部署模式呢?standalone?
之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?

Thank you~

Xintong Song



On Mon, Nov 16, 2020 at 5:53 PM 史 正超 <sh...@outlook.com> wrote:

> 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
>
> 2020-11-16 17:44:52
> java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory
> error has occurred. This can mean two things: either job(s) require(s) a
> larger size of JVM direct memory or there is a direct memory leak. The
> direct memory can be allocated by user code or some of its dependencies. In
> this case 'taskmanager.memory.task.off-heap.size' configuration option
> should be increased. Flink framework and its dependencies also consume the
> direct memory, mostly for network communication. The most of network memory
> is managed by Flink and should not result in out-of-memory error. In
> certain special cases, in particular for jobs with high parallelism, the
> framework may require more direct memory which is not managed by Flink. In
> this case 'taskmanager.memory.framework.off-heap.size' configuration option
> should be increased. If the error persists then there is probably a direct
> memory leak in user code or some of its dependencies which has to be
> investigated and fixed. The task executor has to be shutdown...
>     at java.nio.Bits.reserveMemory(Bits.java:658)
>     at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>     at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>     at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
>     at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>     at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
>     at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
>
>
>

Re: flink-1.11.2 的 内存溢出问题

Posted by Xintong Song <to...@gmail.com>.
是什么部署模式呢?standalone?
之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?

Thank you~

Xintong Song



On Mon, Nov 16, 2020 at 5:53 PM 史 正超 <sh...@outlook.com> wrote:

> 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
>
> 2020-11-16 17:44:52
> java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory
> error has occurred. This can mean two things: either job(s) require(s) a
> larger size of JVM direct memory or there is a direct memory leak. The
> direct memory can be allocated by user code or some of its dependencies. In
> this case 'taskmanager.memory.task.off-heap.size' configuration option
> should be increased. Flink framework and its dependencies also consume the
> direct memory, mostly for network communication. The most of network memory
> is managed by Flink and should not result in out-of-memory error. In
> certain special cases, in particular for jobs with high parallelism, the
> framework may require more direct memory which is not managed by Flink. In
> this case 'taskmanager.memory.framework.off-heap.size' configuration option
> should be increased. If the error persists then there is probably a direct
> memory leak in user code or some of its dependencies which has to be
> investigated and fixed. The task executor has to be shutdown...
>     at java.nio.Bits.reserveMemory(Bits.java:658)
>     at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>     at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>     at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
>     at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>     at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
>     at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
>
>
>