You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 史 正超 <sh...@outlook.com> on 2020/11/16 10:38:21 UTC

回复: flink-1.11.2 的 内存溢出问题

flink-on-yarn . per-job模式,重启是kafka的group.id没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
________________________________
发件人: Xintong Song <to...@gmail.com>
发送时间: 2020年11月16日 10:11
收件人: user-zh <us...@flink.apache.org>
主题: Re: flink-1.11.2 的 内存溢出问题

是什么部署模式呢?standalone?
之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?

Thank you~

Xintong Song



On Mon, Nov 16, 2020 at 5:53 PM 史 正超 <sh...@outlook.com> wrote:

> 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
>
> 2020-11-16 17:44:52
> java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory
> error has occurred. This can mean two things: either job(s) require(s) a
> larger size of JVM direct memory or there is a direct memory leak. The
> direct memory can be allocated by user code or some of its dependencies. In
> this case 'taskmanager.memory.task.off-heap.size' configuration option
> should be increased. Flink framework and its dependencies also consume the
> direct memory, mostly for network communication. The most of network memory
> is managed by Flink and should not result in out-of-memory error. In
> certain special cases, in particular for jobs with high parallelism, the
> framework may require more direct memory which is not managed by Flink. In
> this case 'taskmanager.memory.framework.off-heap.size' configuration option
> should be increased. If the error persists then there is probably a direct
> memory leak in user code or some of its dependencies which has to be
> investigated and fixed. The task executor has to be shutdown...
>     at java.nio.Bits.reserveMemory(Bits.java:658)
>     at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>     at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>     at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
>     at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
>     at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>     at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
>     at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
>
>
>

Re: 回复: flink-1.11.2 的 内存溢出问题

Posted by Xintong Song <to...@gmail.com>.
理论上一个 TM 可以拆分成多少 slot 并没有硬性的限制,但是并不是说并发越大,性能就一定越好。
增大并发,会增加作业对内存的需求。TM 上的 slot 数量过多时,可能会造成 GC 压力大、网络内存不足、OOM 等情况。另外,同一个 TM 上的
slot 多了,运行的 task 多了,也会给框架造成一定的压力。
建议先观察一下  TM 的 cpu 使用情况,如果作业确实存在处理性能不足(延迟增大、存在反压)同时 TM container 的 cpu
(多核)利用率上不去,再考虑调大并发。

Thank you~

Xintong Song



On Tue, Nov 17, 2020 at 10:43 AM 史 正超 <sh...@outlook.com> wrote:

> 谢谢 Xintong
> 大神回复,看了你很多视频。顺便请教个问题,slot的内存有最小的限制吗?我想用有限的资源情况下,把taskmanager的内存slot拆分成最小,以此来达到最大并发。这种拆分有没有一个合理的范围。
> 比如 1 个TM,8G, 那它拆分的最小slot数量 有没有一个限制。
> ________________________________
> 发件人: Xintong Song <to...@gmail.com>
> 发送时间: 2020年11月17日 1:53
> 收件人: user-zh <us...@flink.apache.org>
> 主题: Re: 回复: flink-1.11.2 的 内存溢出问题
>
> >
> > 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过 下面代码动态设置吗?
> >
> > streamTableEnv.getConfig().getConfiguration().setString(key, value);
> >
>
> 不可以的,这个是集群配置。
>
> 可以通过 flink-conf.yaml 配置文件进行配置,或者在提交作业时通过 -yD key=value 的方式动态指定。
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Nov 17, 2020 at 9:31 AM Andrew <87...@qq.com> wrote:
>
> > 应该是不可以这样配置的, 通过配置文件;
> > taskmanager.memory.task.off-heap.size 参数属于taskmanager启动参数;
> >
> >
> > streamTableEnv.getConfig().getConfiguration().setString(key, value);
> > 这种属于任务运行时配置!
> >
> >
> >
> > ------------------&nbsp;原始邮件&nbsp;------------------
> > 发件人:
> >                                                   "user-zh"
> >                                                                     <
> > shizhengchao@outlook.com&gt;;
> > 发送时间:&nbsp;2020年11月16日(星期一) 晚上7:14
> > 收件人:&nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&gt;;
> >
> > 主题:&nbsp;回复: flink-1.11.2 的 内存溢出问题
> >
> >
> >
> > 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size&nbsp; 这个参数可以通过
> > 下面代码动态设置吗?
> >
> > streamTableEnv.getConfig().getConfiguration().setString(key, value);
> >
> > ________________________________
> > 发件人: Xintong Song <tonysong820@gmail.com&gt;
> > 发送时间: 2020年11月16日 10:59
> > 收件人: user-zh <user-zh@flink.apache.org&gt;
> > 主题: Re: flink-1.11.2 的 内存溢出问题
> >
> > 那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
> > 可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
> > 只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Mon, Nov 16, 2020 at 6:38 PM 史 正超 <shizhengchao@outlook.com&gt;
> wrote:
> >
> > &gt; flink-on-yarn . per-job模式,重启是kafka的group.id
> > &gt; 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
> > &gt; ________________________________
> > &gt; 发件人: Xintong Song <tonysong820@gmail.com&gt;
> > &gt; 发送时间: 2020年11月16日 10:11
> > &gt; 收件人: user-zh <user-zh@flink.apache.org&gt;
> > &gt; 主题: Re: flink-1.11.2 的 内存溢出问题
> > &gt;
> > &gt; 是什么部署模式呢?standalone?
> > &gt; 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
> > &gt;
> > &gt; Thank you~
> > &gt;
> > &gt; Xintong Song
> > &gt;
> > &gt;
> > &gt;
> > &gt; On Mon, Nov 16, 2020 at 5:53 PM 史 正超 <shizhengchao@outlook.com&gt;
> > wrote:
> > &gt;
> > &gt; &gt; 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> > &gt; &gt;
> > 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
> > &gt; &gt;
> > &gt; &gt; 2020-11-16 17:44:52
> > &gt; &gt; java.lang.OutOfMemoryError: Direct buffer memory. The direct
> > &gt; out-of-memory
> > &gt; &gt; error has occurred. This can mean two things: either job(s)
> > require(s) a
> > &gt; &gt; larger size of JVM direct memory or there is a direct memory
> > leak. The
> > &gt; &gt; direct memory can be allocated by user code or some of its
> > dependencies.
> > &gt; In
> > &gt; &gt; this case 'taskmanager.memory.task.off-heap.size' configuration
> > option
> > &gt; &gt; should be increased. Flink framework and its dependencies also
> > consume
> > &gt; the
> > &gt; &gt; direct memory, mostly for network communication. The most of
> > network
> > &gt; memory
> > &gt; &gt; is managed by Flink and should not result in out-of-memory
> > error. In
> > &gt; &gt; certain special cases, in particular for jobs with high
> > parallelism, the
> > &gt; &gt; framework may require more direct memory which is not managed
> by
> > Flink.
> > &gt; In
> > &gt; &gt; this case 'taskmanager.memory.framework.off-heap.size'
> > configuration
> > &gt; option
> > &gt; &gt; should be increased. If the error persists then there is
> > probably a
> > &gt; direct
> > &gt; &gt; memory leak in user code or some of its dependencies which has
> > to be
> > &gt; &gt; investigated and fixed. The task executor has to be shutdown...
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > java.nio.Bits.reserveMemory(Bits.java:658)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > java.nio.DirectByteBuffer.<init&gt;(DirectByteBuffer.java:123)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > sun.nio.ch.IOUtil.read(IOUtil.java:195)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
> > &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt; &gt;
> > &gt;
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
> > &gt; &gt;
> > &gt; &gt;
> > &gt; &gt;
> > &gt;
>

回复: 回复: flink-1.11.2 的 内存溢出问题

Posted by 史 正超 <sh...@outlook.com>.
谢谢 Xintong 大神回复,看了你很多视频。顺便请教个问题,slot的内存有最小的限制吗?我想用有限的资源情况下,把taskmanager的内存slot拆分成最小,以此来达到最大并发。这种拆分有没有一个合理的范围。
比如 1 个TM,8G, 那它拆分的最小slot数量 有没有一个限制。
________________________________
发件人: Xintong Song <to...@gmail.com>
发送时间: 2020年11月17日 1:53
收件人: user-zh <us...@flink.apache.org>
主题: Re: 回复: flink-1.11.2 的 内存溢出问题

>
> 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过 下面代码动态设置吗?
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
>

不可以的,这个是集群配置。

可以通过 flink-conf.yaml 配置文件进行配置,或者在提交作业时通过 -yD key=value 的方式动态指定。


Thank you~

Xintong Song



On Tue, Nov 17, 2020 at 9:31 AM Andrew <87...@qq.com> wrote:

> 应该是不可以这样配置的, 通过配置文件;
> taskmanager.memory.task.off-heap.size 参数属于taskmanager启动参数;
>
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
> 这种属于任务运行时配置!
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> shizhengchao@outlook.com&gt;;
> 发送时间:&nbsp;2020年11月16日(星期一) 晚上7:14
> 收件人:&nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;回复: flink-1.11.2 的 内存溢出问题
>
>
>
> 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size&nbsp; 这个参数可以通过
> 下面代码动态设置吗?
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
>
> ________________________________
> 发件人: Xintong Song <tonysong820@gmail.com&gt;
> 发送时间: 2020年11月16日 10:59
> 收件人: user-zh <user-zh@flink.apache.org&gt;
> 主题: Re: flink-1.11.2 的 内存溢出问题
>
> 那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
> 可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
> 只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Nov 16, 2020 at 6:38 PM 史 正超 <shizhengchao@outlook.com&gt; wrote:
>
> &gt; flink-on-yarn . per-job模式,重启是kafka的group.id
> &gt; 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
> &gt; ________________________________
> &gt; 发件人: Xintong Song <tonysong820@gmail.com&gt;
> &gt; 发送时间: 2020年11月16日 10:11
> &gt; 收件人: user-zh <user-zh@flink.apache.org&gt;
> &gt; 主题: Re: flink-1.11.2 的 内存溢出问题
> &gt;
> &gt; 是什么部署模式呢?standalone?
> &gt; 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
> &gt;
> &gt; Thank you~
> &gt;
> &gt; Xintong Song
> &gt;
> &gt;
> &gt;
> &gt; On Mon, Nov 16, 2020 at 5:53 PM 史 正超 <shizhengchao@outlook.com&gt;
> wrote:
> &gt;
> &gt; &gt; 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> &gt; &gt;
> 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
> &gt; &gt;
> &gt; &gt; 2020-11-16 17:44:52
> &gt; &gt; java.lang.OutOfMemoryError: Direct buffer memory. The direct
> &gt; out-of-memory
> &gt; &gt; error has occurred. This can mean two things: either job(s)
> require(s) a
> &gt; &gt; larger size of JVM direct memory or there is a direct memory
> leak. The
> &gt; &gt; direct memory can be allocated by user code or some of its
> dependencies.
> &gt; In
> &gt; &gt; this case 'taskmanager.memory.task.off-heap.size' configuration
> option
> &gt; &gt; should be increased. Flink framework and its dependencies also
> consume
> &gt; the
> &gt; &gt; direct memory, mostly for network communication. The most of
> network
> &gt; memory
> &gt; &gt; is managed by Flink and should not result in out-of-memory
> error. In
> &gt; &gt; certain special cases, in particular for jobs with high
> parallelism, the
> &gt; &gt; framework may require more direct memory which is not managed by
> Flink.
> &gt; In
> &gt; &gt; this case 'taskmanager.memory.framework.off-heap.size'
> configuration
> &gt; option
> &gt; &gt; should be increased. If the error persists then there is
> probably a
> &gt; direct
> &gt; &gt; memory leak in user code or some of its dependencies which has
> to be
> &gt; &gt; investigated and fixed. The task executor has to be shutdown...
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.nio.Bits.reserveMemory(Bits.java:658)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.nio.DirectByteBuffer.<init&gt;(DirectByteBuffer.java:123)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> sun.nio.ch.IOUtil.read(IOUtil.java:195)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt;

Re: 回复: flink-1.11.2 的 内存溢出问题

Posted by Xintong Song <to...@gmail.com>.
>
> 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过 下面代码动态设置吗?
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
>

不可以的,这个是集群配置。

可以通过 flink-conf.yaml 配置文件进行配置,或者在提交作业时通过 -yD key=value 的方式动态指定。


Thank you~

Xintong Song



On Tue, Nov 17, 2020 at 9:31 AM Andrew <87...@qq.com> wrote:

> 应该是不可以这样配置的, 通过配置文件;
> taskmanager.memory.task.off-heap.size 参数属于taskmanager启动参数;
>
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
> 这种属于任务运行时配置!
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> shizhengchao@outlook.com&gt;;
> 发送时间:&nbsp;2020年11月16日(星期一) 晚上7:14
> 收件人:&nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;回复: flink-1.11.2 的 内存溢出问题
>
>
>
> 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size&nbsp; 这个参数可以通过
> 下面代码动态设置吗?
>
> streamTableEnv.getConfig().getConfiguration().setString(key, value);
>
> ________________________________
> 发件人: Xintong Song <tonysong820@gmail.com&gt;
> 发送时间: 2020年11月16日 10:59
> 收件人: user-zh <user-zh@flink.apache.org&gt;
> 主题: Re: flink-1.11.2 的 内存溢出问题
>
> 那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
> 可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
> 只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Nov 16, 2020 at 6:38 PM 史 正超 <shizhengchao@outlook.com&gt; wrote:
>
> &gt; flink-on-yarn . per-job模式,重启是kafka的group.id
> &gt; 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
> &gt; ________________________________
> &gt; 发件人: Xintong Song <tonysong820@gmail.com&gt;
> &gt; 发送时间: 2020年11月16日 10:11
> &gt; 收件人: user-zh <user-zh@flink.apache.org&gt;
> &gt; 主题: Re: flink-1.11.2 的 内存溢出问题
> &gt;
> &gt; 是什么部署模式呢?standalone?
> &gt; 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
> &gt;
> &gt; Thank you~
> &gt;
> &gt; Xintong Song
> &gt;
> &gt;
> &gt;
> &gt; On Mon, Nov 16, 2020 at 5:53 PM 史 正超 <shizhengchao@outlook.com&gt;
> wrote:
> &gt;
> &gt; &gt; 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> &gt; &gt;
> 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
> &gt; &gt;
> &gt; &gt; 2020-11-16 17:44:52
> &gt; &gt; java.lang.OutOfMemoryError: Direct buffer memory. The direct
> &gt; out-of-memory
> &gt; &gt; error has occurred. This can mean two things: either job(s)
> require(s) a
> &gt; &gt; larger size of JVM direct memory or there is a direct memory
> leak. The
> &gt; &gt; direct memory can be allocated by user code or some of its
> dependencies.
> &gt; In
> &gt; &gt; this case 'taskmanager.memory.task.off-heap.size' configuration
> option
> &gt; &gt; should be increased. Flink framework and its dependencies also
> consume
> &gt; the
> &gt; &gt; direct memory, mostly for network communication. The most of
> network
> &gt; memory
> &gt; &gt; is managed by Flink and should not result in out-of-memory
> error. In
> &gt; &gt; certain special cases, in particular for jobs with high
> parallelism, the
> &gt; &gt; framework may require more direct memory which is not managed by
> Flink.
> &gt; In
> &gt; &gt; this case 'taskmanager.memory.framework.off-heap.size'
> configuration
> &gt; option
> &gt; &gt; should be increased. If the error persists then there is
> probably a
> &gt; direct
> &gt; &gt; memory leak in user code or some of its dependencies which has
> to be
> &gt; &gt; investigated and fixed. The task executor has to be shutdown...
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.nio.Bits.reserveMemory(Bits.java:658)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.nio.DirectByteBuffer.<init&gt;(DirectByteBuffer.java:123)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> sun.nio.ch.IOUtil.read(IOUtil.java:195)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
> &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; &gt;
> &gt;
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt;

回复:回复: flink-1.11.2 的 内存溢出问题

Posted by Andrew <87...@qq.com>.
应该是不可以这样配置的, 通过配置文件;
taskmanager.memory.task.off-heap.size 参数属于taskmanager启动参数;


streamTableEnv.getConfig().getConfiguration().setString(key, value); 这种属于任务运行时配置!



------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <shizhengchao@outlook.com&gt;;
发送时间:&nbsp;2020年11月16日(星期一) 晚上7:14
收件人:&nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&gt;;

主题:&nbsp;回复: flink-1.11.2 的 内存溢出问题



好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size&nbsp; 这个参数可以通过 下面代码动态设置吗?

streamTableEnv.getConfig().getConfiguration().setString(key, value);

________________________________
发件人: Xintong Song <tonysong820@gmail.com&gt;
发送时间: 2020年11月16日 10:59
收件人: user-zh <user-zh@flink.apache.org&gt;
主题: Re: flink-1.11.2 的 内存溢出问题

那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。

Thank you~

Xintong Song



On Mon, Nov 16, 2020 at 6:38 PM 史 正超 <shizhengchao@outlook.com&gt; wrote:

&gt; flink-on-yarn . per-job模式,重启是kafka的group.id
&gt; 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
&gt; ________________________________
&gt; 发件人: Xintong Song <tonysong820@gmail.com&gt;
&gt; 发送时间: 2020年11月16日 10:11
&gt; 收件人: user-zh <user-zh@flink.apache.org&gt;
&gt; 主题: Re: flink-1.11.2 的 内存溢出问题
&gt;
&gt; 是什么部署模式呢?standalone?
&gt; 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
&gt;
&gt; Thank you~
&gt;
&gt; Xintong Song
&gt;
&gt;
&gt;
&gt; On Mon, Nov 16, 2020 at 5:53 PM 史 正超 <shizhengchao@outlook.com&gt; wrote:
&gt;
&gt; &gt; 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
&gt; &gt; 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
&gt; &gt;
&gt; &gt; 2020-11-16 17:44:52
&gt; &gt; java.lang.OutOfMemoryError: Direct buffer memory. The direct
&gt; out-of-memory
&gt; &gt; error has occurred. This can mean two things: either job(s) require(s) a
&gt; &gt; larger size of JVM direct memory or there is a direct memory leak. The
&gt; &gt; direct memory can be allocated by user code or some of its dependencies.
&gt; In
&gt; &gt; this case 'taskmanager.memory.task.off-heap.size' configuration option
&gt; &gt; should be increased. Flink framework and its dependencies also consume
&gt; the
&gt; &gt; direct memory, mostly for network communication. The most of network
&gt; memory
&gt; &gt; is managed by Flink and should not result in out-of-memory error. In
&gt; &gt; certain special cases, in particular for jobs with high parallelism, the
&gt; &gt; framework may require more direct memory which is not managed by Flink.
&gt; In
&gt; &gt; this case 'taskmanager.memory.framework.off-heap.size' configuration
&gt; option
&gt; &gt; should be increased. If the error persists then there is probably a
&gt; direct
&gt; &gt; memory leak in user code or some of its dependencies which has to be
&gt; &gt; investigated and fixed. The task executor has to be shutdown...
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at java.nio.Bits.reserveMemory(Bits.java:658)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at java.nio.DirectByteBuffer.<init&gt;(DirectByteBuffer.java:123)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at sun.nio.ch.IOUtil.read(IOUtil.java:195)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt;

回复: flink-1.11.2 的 内存溢出问题

Posted by 史 正超 <sh...@outlook.com>.
好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size  这个参数可以通过 下面代码动态设置吗?

streamTableEnv.getConfig().getConfiguration().setString(key, value);

________________________________
发件人: Xintong Song <to...@gmail.com>
发送时间: 2020年11月16日 10:59
收件人: user-zh <us...@flink.apache.org>
主题: Re: flink-1.11.2 的 内存溢出问题

那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。

Thank you~

Xintong Song



On Mon, Nov 16, 2020 at 6:38 PM 史 正超 <sh...@outlook.com> wrote:

> flink-on-yarn . per-job模式,重启是kafka的group.id
> 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
> ________________________________
> 发件人: Xintong Song <to...@gmail.com>
> 发送时间: 2020年11月16日 10:11
> 收件人: user-zh <us...@flink.apache.org>
> 主题: Re: flink-1.11.2 的 内存溢出问题
>
> 是什么部署模式呢?standalone?
> 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Nov 16, 2020 at 5:53 PM 史 正超 <sh...@outlook.com> wrote:
>
> > 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> > 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
> >
> > 2020-11-16 17:44:52
> > java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory
> > error has occurred. This can mean two things: either job(s) require(s) a
> > larger size of JVM direct memory or there is a direct memory leak. The
> > direct memory can be allocated by user code or some of its dependencies.
> In
> > this case 'taskmanager.memory.task.off-heap.size' configuration option
> > should be increased. Flink framework and its dependencies also consume
> the
> > direct memory, mostly for network communication. The most of network
> memory
> > is managed by Flink and should not result in out-of-memory error. In
> > certain special cases, in particular for jobs with high parallelism, the
> > framework may require more direct memory which is not managed by Flink.
> In
> > this case 'taskmanager.memory.framework.off-heap.size' configuration
> option
> > should be increased. If the error persists then there is probably a
> direct
> > memory leak in user code or some of its dependencies which has to be
> > investigated and fixed. The task executor has to be shutdown...
> >     at java.nio.Bits.reserveMemory(Bits.java:658)
> >     at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> >     at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> >     at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> >     at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> >     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> >     at
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
> >     at
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
> >
> >
> >
>

Re: flink-1.11.2 的 内存溢出问题

Posted by Xintong Song <to...@gmail.com>.
那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。
可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。
只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。

Thank you~

Xintong Song



On Mon, Nov 16, 2020 at 6:38 PM 史 正超 <sh...@outlook.com> wrote:

> flink-on-yarn . per-job模式,重启是kafka的group.id
> 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。
> ________________________________
> 发件人: Xintong Song <to...@gmail.com>
> 发送时间: 2020年11月16日 10:11
> 收件人: user-zh <us...@flink.apache.org>
> 主题: Re: flink-1.11.2 的 内存溢出问题
>
> 是什么部署模式呢?standalone?
> 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Nov 16, 2020 at 5:53 PM 史 正超 <sh...@outlook.com> wrote:
>
> > 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给
> > 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。
> >
> > 2020-11-16 17:44:52
> > java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory
> > error has occurred. This can mean two things: either job(s) require(s) a
> > larger size of JVM direct memory or there is a direct memory leak. The
> > direct memory can be allocated by user code or some of its dependencies.
> In
> > this case 'taskmanager.memory.task.off-heap.size' configuration option
> > should be increased. Flink framework and its dependencies also consume
> the
> > direct memory, mostly for network communication. The most of network
> memory
> > is managed by Flink and should not result in out-of-memory error. In
> > certain special cases, in particular for jobs with high parallelism, the
> > framework may require more direct memory which is not managed by Flink.
> In
> > this case 'taskmanager.memory.framework.off-heap.size' configuration
> option
> > should be increased. If the error persists then there is probably a
> direct
> > memory leak in user code or some of its dependencies which has to be
> > investigated and fixed. The task executor has to be shutdown...
> >     at java.nio.Bits.reserveMemory(Bits.java:658)
> >     at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> >     at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> >     at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> >     at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> >     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
> >     at
> >
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> >     at
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
> >     at
> >
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
> >
> >
> >
>