You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 郭欣瑞 <gu...@betalpha.com.INVALID> on 2023/06/19 08:18:45 UTC

Flink1.14 需求超大内存

我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错

DeclarativeSlotPoolBridge.java:351  - Could not acquire the minimum required resources, failing slot requests. Acquired: [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered TMs: 1, registered slots: 1 free slots: 0
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了
这是我doris sink的代码,flink doris connector版本是1.1.1
DorisSink.Builder<RowData> builder = DorisSink.builder();
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP"))
                .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data"))
                .setUsername(parameterTool.get("doris.user"))
                .setPassword(parameterTool.get("doris.password"));

        Properties pro = new Properties();
        pro.setProperty("format", "json");
        pro.setProperty("read_json_by_line", "true");

        Date date = new Date();
        DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
        executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro);

        String[] fields = {"uid","subject","trade_date","update_time","value"};
        DataType[] types = {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()};

        builder.setDorisReadOptions(DorisReadOptions.builder().build())
                .setDorisExecutionOptions(executionBuilder.build())
                .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build())
                .setDorisOptions(dorisBuilder.build());
        fundCategoryDataStream.sinkTo(builder.build())
                .slotSharingGroup(parameterTool.get("fund_category_data_sink_group", "fund_category_sink"))
                .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1))
                .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1"))
                .name("fundCategorySinkName”);



Re:Re: Flink1.14 需求超大内存

Posted by 王国成 <ta...@163.com>.
退订










在 2023-06-20 11:16:18,"Yanfei Lei" <fr...@gmail.com> 写道:
>Hi,
>
>从 ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes),
>taskOffHeapMemory=1024.000gb (1099511627776 bytes),
>managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb
>(67108864 bytes)}, numberOfRequiredSlots=1}] 来看,sink节点想申请 1T的 heap
>memory 和 1T的 off heap memory,可以再额外检查一下代码或者flink-conf里 是否配置了 memory
>size相关的参数[1].
>
>[1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#taskmanager-memory-process-size
>
>Best,
>Yanfei
>
>Shammon FY <zj...@gmail.com> 于2023年6月20日周二 08:45写道:
>>
>> Hi,
>>
>> 这个doris的sink是你自己实现的还是flink或者doris官方提供的?从错误来看,像是sink节点申请了超大的内存资源,你可以确认一下是否有问题,或者是否有配置项可以配置
>>
>> Best,
>> Shammon FY
>>
>> On Mon, Jun 19, 2023 at 4:19 PM 郭欣瑞 <gu...@betalpha.com.invalid> wrote:
>>
>> > 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错
>> >
>> > DeclarativeSlotPoolBridge.java:351  - Could not acquire the minimum
>> > required resources, failing slot requests. Acquired:
>> > [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb
>> > (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes),
>> > managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864
>> > bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered
>> > TMs: 1, registered slots: 1 free slots: 0
>> > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> > Could not acquire the minimum required resources.
>> >
>> > 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了
>> > 这是我doris sink的代码,flink doris connector版本是1.1.1
>> > DorisSink.Builder<RowData> builder = DorisSink.builder();
>> >         DorisOptions.Builder dorisBuilder = DorisOptions.builder();
>> >         dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP"))
>> >
>> > .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data"))
>> >                 .setUsername(parameterTool.get("doris.user"))
>> >                 .setPassword(parameterTool.get("doris.password"));
>> >
>> >         Properties pro = new Properties();
>> >         pro.setProperty("format", "json");
>> >         pro.setProperty("read_json_by_line", "true");
>> >
>> >         Date date = new Date();
>> >         DorisExecutionOptions.Builder executionBuilder =
>> > DorisExecutionOptions.builder();
>> >
>> > executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro);
>> >
>> >         String[] fields =
>> > {"uid","subject","trade_date","update_time","value"};
>> >         DataType[] types =
>> > {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()};
>> >
>> >         builder.setDorisReadOptions(DorisReadOptions.builder().build())
>> >                 .setDorisExecutionOptions(executionBuilder.build())
>> >
>> > .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build())
>> >                 .setDorisOptions(dorisBuilder.build());
>> >         fundCategoryDataStream.sinkTo(builder.build())
>> >
>> > .slotSharingGroup(parameterTool.get("fund_category_data_sink_group",
>> > "fund_category_sink"))
>> >
>> > .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1))
>> >
>> > .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1"))
>> >                 .name("fundCategorySinkName”);
>> >
>> >
>> >

Re: Flink1.14 需求超大内存

Posted by Yanfei Lei <fr...@gmail.com>.
Hi,

从 ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes),
taskOffHeapMemory=1024.000gb (1099511627776 bytes),
managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb
(67108864 bytes)}, numberOfRequiredSlots=1}] 来看,sink节点想申请 1T的 heap
memory 和 1T的 off heap memory,可以再额外检查一下代码或者flink-conf里 是否配置了 memory
size相关的参数[1].

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#taskmanager-memory-process-size

Best,
Yanfei

Shammon FY <zj...@gmail.com> 于2023年6月20日周二 08:45写道:
>
> Hi,
>
> 这个doris的sink是你自己实现的还是flink或者doris官方提供的?从错误来看,像是sink节点申请了超大的内存资源,你可以确认一下是否有问题,或者是否有配置项可以配置
>
> Best,
> Shammon FY
>
> On Mon, Jun 19, 2023 at 4:19 PM 郭欣瑞 <gu...@betalpha.com.invalid> wrote:
>
> > 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错
> >
> > DeclarativeSlotPoolBridge.java:351  - Could not acquire the minimum
> > required resources, failing slot requests. Acquired:
> > [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb
> > (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes),
> > managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864
> > bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered
> > TMs: 1, registered slots: 1 free slots: 0
> > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> > Could not acquire the minimum required resources.
> >
> > 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了
> > 这是我doris sink的代码,flink doris connector版本是1.1.1
> > DorisSink.Builder<RowData> builder = DorisSink.builder();
> >         DorisOptions.Builder dorisBuilder = DorisOptions.builder();
> >         dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP"))
> >
> > .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data"))
> >                 .setUsername(parameterTool.get("doris.user"))
> >                 .setPassword(parameterTool.get("doris.password"));
> >
> >         Properties pro = new Properties();
> >         pro.setProperty("format", "json");
> >         pro.setProperty("read_json_by_line", "true");
> >
> >         Date date = new Date();
> >         DorisExecutionOptions.Builder executionBuilder =
> > DorisExecutionOptions.builder();
> >
> > executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro);
> >
> >         String[] fields =
> > {"uid","subject","trade_date","update_time","value"};
> >         DataType[] types =
> > {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()};
> >
> >         builder.setDorisReadOptions(DorisReadOptions.builder().build())
> >                 .setDorisExecutionOptions(executionBuilder.build())
> >
> > .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build())
> >                 .setDorisOptions(dorisBuilder.build());
> >         fundCategoryDataStream.sinkTo(builder.build())
> >
> > .slotSharingGroup(parameterTool.get("fund_category_data_sink_group",
> > "fund_category_sink"))
> >
> > .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1))
> >
> > .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1"))
> >                 .name("fundCategorySinkName”);
> >
> >
> >

Re: Flink1.14 需求超大内存

Posted by Shammon FY <zj...@gmail.com>.
Hi,

这个doris的sink是你自己实现的还是flink或者doris官方提供的?从错误来看,像是sink节点申请了超大的内存资源,你可以确认一下是否有问题,或者是否有配置项可以配置

Best,
Shammon FY

On Mon, Jun 19, 2023 at 4:19 PM 郭欣瑞 <gu...@betalpha.com.invalid> wrote:

> 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错
>
> DeclarativeSlotPoolBridge.java:351  - Could not acquire the minimum
> required resources, failing slot requests. Acquired:
> [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb
> (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes),
> managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864
> bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered
> TMs: 1, registered slots: 1 free slots: 0
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not acquire the minimum required resources.
>
> 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了
> 这是我doris sink的代码,flink doris connector版本是1.1.1
> DorisSink.Builder<RowData> builder = DorisSink.builder();
>         DorisOptions.Builder dorisBuilder = DorisOptions.builder();
>         dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP"))
>
> .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data"))
>                 .setUsername(parameterTool.get("doris.user"))
>                 .setPassword(parameterTool.get("doris.password"));
>
>         Properties pro = new Properties();
>         pro.setProperty("format", "json");
>         pro.setProperty("read_json_by_line", "true");
>
>         Date date = new Date();
>         DorisExecutionOptions.Builder executionBuilder =
> DorisExecutionOptions.builder();
>
> executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro);
>
>         String[] fields =
> {"uid","subject","trade_date","update_time","value"};
>         DataType[] types =
> {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()};
>
>         builder.setDorisReadOptions(DorisReadOptions.builder().build())
>                 .setDorisExecutionOptions(executionBuilder.build())
>
> .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build())
>                 .setDorisOptions(dorisBuilder.build());
>         fundCategoryDataStream.sinkTo(builder.build())
>
> .slotSharingGroup(parameterTool.get("fund_category_data_sink_group",
> "fund_category_sink"))
>
> .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1))
>
> .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1"))
>                 .name("fundCategorySinkName”);
>
>
>

Re: Flink1.14 需求超大内存

Posted by 郭欣瑞 <gu...@betalpha.com.INVALID>.
我排查了一下,因为任务其实是跑在本地模式上,而我一直没有配置本地模式的slot数量导致slot不足,而这个1024G其实是一个默认值所以出现了需求1T内存这种奇怪的报错。
以往没有出现这种问题是因为以前本地模式会自动分配足够的slot,但flink doris connecter由于未知的原因没有被计入slot需求中,这就导致缺少一个slot无法达到需求。

> 2023年6月19日 16:18,郭欣瑞 <gu...@betalpha.com.INVALID> 写道:
> 
> 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错
> 
> DeclarativeSlotPoolBridge.java:351  - Could not acquire the minimum required resources, failing slot requests. Acquired: [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered TMs: 1, registered slots: 1 free slots: 0
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
> 
> 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了
> 这是我doris sink的代码,flink doris connector版本是1.1.1
> DorisSink.Builder<RowData> builder = DorisSink.builder();
>        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
>        dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP"))
>                .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data"))
>                .setUsername(parameterTool.get("doris.user"))
>                .setPassword(parameterTool.get("doris.password"));
> 
>        Properties pro = new Properties();
>        pro.setProperty("format", "json");
>        pro.setProperty("read_json_by_line", "true");
> 
>        Date date = new Date();
>        DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
>        executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro);
> 
>        String[] fields = {"uid","subject","trade_date","update_time","value"};
>        DataType[] types = {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()};
> 
>        builder.setDorisReadOptions(DorisReadOptions.builder().build())
>                .setDorisExecutionOptions(executionBuilder.build())
>                .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build())
>                .setDorisOptions(dorisBuilder.build());
>        fundCategoryDataStream.sinkTo(builder.build())
>                .slotSharingGroup(parameterTool.get("fund_category_data_sink_group", "fund_category_sink"))
>                .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1))
>                .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1"))
>                .name("fundCategorySinkName”);
>