You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 生于八十年代 <62...@qq.com.INVALID> on 2023/02/28 10:30:04 UTC

managed memory占用100%的问题

社区的各位大佬们有个问题咨询一下大家:1. 问题背景:我们在使用flink读取消费kafka中的hdfs路径消息,然后从hdfs中读取文件,做完处理后写入hive,整个过程都是以流式的过程完成,而不是批处理的过程; 目前遇到的问题是任务运行一段时间之后,kafka就开始出现hdfs路径消息积压,目前发现managed memory区域消耗的内存非常大,占用了100%。但是我们目前使用的是hashmap+hdfs的状态后端,写入hive的sql的是10分钟的滚动窗口+group by + sum这样的操作,同时我们开启了minibatch这样的优化选项。



2. 由于我们没有使用rocks db和批处理,按照官网的说法,这一块内存是不占用的,但是我尝试给这个区域配置为0,会报下面空指针的异常。
java.lang.NullPointerException: Initial Segment may not be null
&nbsp;at org.apache.flink.runtime.memory.AbstractPagedOutputView.<init&gt;(AbstractPagedOutputView.java:67)
&nbsp;at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init&gt;(SimpleCollectingOutputView.java:46)
&nbsp;at org.apache.flink.table.runtime.util.collections.binary.AbstractBytesMultiMap$RecordArea.<init&gt;(AbstractBytesMultiMap.java:226)
&nbsp;at org.apache.flink.table.runtime.util.collections.binary.AbstractBytesMultiMap.<init&gt;(AbstractBytesMultiMap.java:114)
&nbsp;at org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap.<init&gt;(WindowBytesMultiMap.java:40)
&nbsp;at org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.<init&gt;(RecordsWindowBuffer.java:72)
&nbsp;at org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer$Factory.create(RecordsWindowBuffer.java:164)
&nbsp;at org.apache.flink.table.runtime.operators.aggregate.window.processors.AbstractWindowAggProcessor.open(AbstractWindowAggProcessor.java:118)
&nbsp;at org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator.open(SlicingWindowOperator.java:152)
&nbsp;at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
&nbsp;at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
&nbsp;at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
&nbsp;at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
&nbsp;at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
&nbsp;at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
&nbsp;at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
&nbsp;at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
&nbsp;at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
&nbsp;at java.lang.Thread.run(Thread.java:748)



3. 下面是我们的内存配置:



4. 所以managed区域到底存储了什么东西,占用了这么大的内存?我们的kafka消息积压是否与这里的managed区域占满有关系,希望各位大佬能答疑解惑

Re: managed memory占用100%的问题

Posted by Shammon FY <zj...@gmail.com>.
Hi

根据邮件里的异常信息看了下代码,这里的RecordArea会从managed memory申请内存分片 你可以根据作业流量尝试调整下窗口大小或者内存分配

Best,
Shammon


On Tue, Feb 28, 2023 at 6:47 PM Junrui Lee <jr...@gmail.com> wrote:

> Hi,
>
> 图片挂掉了,能不能直接用文字描述配置文件?
>
> Best,
> Junrui
>
> 生于八十年代 <62...@qq.com.invalid> 于2023年2月28日周二 18:31写道:
>
> > 社区的各位大佬们有个问题咨询一下大家:
> > 1.
> >
> 问题背景:我们在使用flink读取消费kafka中的hdfs路径消息,然后从hdfs中读取文件,做完处理后写入hive,整个过程都是以流式的过程完成,而不是批处理的过程;
> > 目前遇到的问题是任务运行一段时间之后,kafka就开始出现hdfs路径消息积压,目前发现managed
> >
> memory区域消耗的内存非常大,占用了100%。但是我们目前使用的是hashmap+hdfs的状态后端,写入hive的sql的是10分钟的滚动窗口+group
> > by + sum这样的操作,同时我们开启了minibatch这样的优化选项。
> >
> >
> > 2. 由于我们没有使用rocks db和批处理,按照官网的说法,这一块内存是不占用的,但是我尝试给这个区域配置为0,会报下面空指针的异常。
> > java.lang.NullPointerException: Initial Segment may not be null
> >  at
> >
> org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:67)
> >  at
> > org.apache.flink.runtime.io
> .disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:46)
> >  at
> >
> org.apache.flink.table.runtime.util.collections.binary.AbstractBytesMultiMap$RecordArea.<init>(AbstractBytesMultiMap.java:226)
> >  at
> >
> org.apache.flink.table.runtime.util.collections.binary.AbstractBytesMultiMap.<init>(AbstractBytesMultiMap.java:114)
> >  at
> >
> org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap.<init>(WindowBytesMultiMap.java:40)
> >  at
> >
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.<init>(RecordsWindowBuffer.java:72)
> >  at
> >
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer$Factory.create(RecordsWindowBuffer.java:164)
> >  at
> >
> org.apache.flink.table.runtime.operators.aggregate.window.processors.AbstractWindowAggProcessor.open(AbstractWindowAggProcessor.java:118)
> >  at
> >
> org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator.open(SlicingWindowOperator.java:152)
> >  at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> >  at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
> >  at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> >  at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
> >  at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
> >  at
> >
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> >  at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> >  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> >  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> >  at java.lang.Thread.run(Thread.java:748)
> >
> > 3. 下面是我们的内存配置:
> >
> >
> > 4.
> >
> 所以managed区域到底存储了什么东西,占用了这么大的内存?我们的kafka消息积压是否与这里的managed区域占满有关系,希望各位大佬能答疑解惑
> >
>

Re: managed memory占用100%的问题

Posted by Junrui Lee <jr...@gmail.com>.
Hi,

图片挂掉了,能不能直接用文字描述配置文件?

Best,
Junrui

生于八十年代 <62...@qq.com.invalid> 于2023年2月28日周二 18:31写道:

> 社区的各位大佬们有个问题咨询一下大家:
> 1.
> 问题背景:我们在使用flink读取消费kafka中的hdfs路径消息,然后从hdfs中读取文件,做完处理后写入hive,整个过程都是以流式的过程完成,而不是批处理的过程;
> 目前遇到的问题是任务运行一段时间之后,kafka就开始出现hdfs路径消息积压,目前发现managed
> memory区域消耗的内存非常大,占用了100%。但是我们目前使用的是hashmap+hdfs的状态后端,写入hive的sql的是10分钟的滚动窗口+group
> by + sum这样的操作,同时我们开启了minibatch这样的优化选项。
>
>
> 2. 由于我们没有使用rocks db和批处理,按照官网的说法,这一块内存是不占用的,但是我尝试给这个区域配置为0,会报下面空指针的异常。
> java.lang.NullPointerException: Initial Segment may not be null
>  at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:67)
>  at
> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:46)
>  at
> org.apache.flink.table.runtime.util.collections.binary.AbstractBytesMultiMap$RecordArea.<init>(AbstractBytesMultiMap.java:226)
>  at
> org.apache.flink.table.runtime.util.collections.binary.AbstractBytesMultiMap.<init>(AbstractBytesMultiMap.java:114)
>  at
> org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap.<init>(WindowBytesMultiMap.java:40)
>  at
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer.<init>(RecordsWindowBuffer.java:72)
>  at
> org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer$Factory.create(RecordsWindowBuffer.java:164)
>  at
> org.apache.flink.table.runtime.operators.aggregate.window.processors.AbstractWindowAggProcessor.open(AbstractWindowAggProcessor.java:118)
>  at
> org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator.open(SlicingWindowOperator.java:152)
>  at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
>  at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>  at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>  at java.lang.Thread.run(Thread.java:748)
>
> 3. 下面是我们的内存配置:
>
>
> 4.
> 所以managed区域到底存储了什么东西,占用了这么大的内存?我们的kafka消息积压是否与这里的managed区域占满有关系,希望各位大佬能答疑解惑
>