You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "Bai XiangHui(智能平台)" <xi...@sohu-inc.com> on 2022/02/14 09:49:04 UTC

flink-netty-shuffle文件占满节点磁盘

各位老师好:
执行下面代码导致所有节点磁盘占满,在本地调试时C盘也沾满了
文件名称:flink-netty-shuffle-b71f58a6-0fdb-437f-a6b8-6b0bbedd3dfa
说明:

1.  批处理模式
2.本地测试时输入目录oneDay和long大小在1G左右,启动程序后会把C(C:\Users\xxx \AppData\Local\Temp)盘剩余的几十G空间占满,部署到集群后,也会逐渐占满各节点磁盘

3.广播流blackListStream大概一万条记录,尝试把process中获取广播变量的代码和processBroadcastElement方法注释了,仍不起作用



        String oneDayLogFile = "C:\\Users\\xianghuibai\\Desktop\\oneDay";
        String historyFileName = "C:\\Users\\xianghuibai\\Desktop\\long";

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        DataStream<String> blackListStream = env.fromCollection(RedisPool20484Utils.getCustomJedisCluster().smembers("user_blacklist_cid_test"));

        MapStateDescriptor<String, Boolean> type =
                new MapStateDescriptor<String, Boolean>("blackList_type", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO);
        BroadcastStream<String> blackList_b = blackListStream.broadcast(type);

        DataStream<Tuple5<String, String, String, String, String>> oneDayLog = env.readTextFile(oneDayLogFile)
                .map(new MapFunction<String, Tuple5<String, String, String, String, String>>() {
                    @Override
                    public Tuple5<String, String, String, String, String> map(String line) throws Exception {
                        String[] arrs = line.split("\t");
                        return new Tuple5<>(arrs[0], arrs[1], arrs[2], arrs[3], arrs[4]);
                    }
                });

        SingleOutputStreamOperator<Tuple5<String, String, String, String, String>> dayOutput = env.readTextFile(historyFileName)
                .flatMap(new FlatParseLong())
                .union(oneDayLog)
                .connect(blackList_b)
                .process(new BroadcastProcessFunction<Tuple5<String, String, String, String, String>, String, Tuple5<String, String, String, String, String>>() {
                    private transient ReadOnlyBroadcastState<String, Boolean> broadcastState;

                    @Override
                    public void processElement(Tuple5<String, String, String, String, String> value, ReadOnlyContext ctx, Collector<Tuple5<String, String, String, String, String>> out) throws Exception {
                        if(broadcastState == null){
                            broadcastState = ctx.getBroadcastState(type);
                        }
                        if(value!=null && !broadcastState.contains(value.f0)){
                            out.collect(value);
                        }
                    }
                    @Override
                    public void processBroadcastElement(String value, Context ctx, Collector<Tuple5<String, String, String, String, String>> out) throws Exception {
                        if(StringUtils.isNotEmpty(value)){
                            BroadcastState<String, Boolean> broadcastState = ctx.getBroadcastState(type);
                            broadcastState.put(value, true);
                        }
                    }
                });


Re: flink-netty-shuffle文件占满节点磁盘

Posted by Yingjie Cao <ke...@gmail.com>.
磁盘占满报的错误是什么呢?是iNode不够用还是磁盘空间不够用呢?我理解这个是个目录吧:flink-netty-shuffle-b71f58a6-0fdb-437f-a6b8-6b0bbedd3dfa,是说这个目录太多了没被清理掉,导致iNode不足,还是说这个目录下的文件没被清理干净,导致磁盘空间被占满呢?如果作业停掉,会恢复吗(也就是说是说作业本身就是要用这么多磁盘,还是有泄露,即使job停掉数据依然在呢)?
另外就是作业用的什么版本的flink呢?建议开一下数据压缩,如果作业本身确实需要的磁盘空间很大,那压缩应该有利于节省磁盘空间,另外默认是hash
shuffle的实现,文件很多,会占用很多的iNode,容易导致报磁盘空间不足的错误。有一个文档可以参考下:
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/batch/blocking_shuffle/
。不过这个文档的内容还没有更新成最新的,估计明天会更新,如果着急的话,也可以直接参考下源代码中的描述:
https://github.com/apache/flink/blob/master/docs/content.zh/docs/ops/batch/blocking_shuffle.md
。

Bai XiangHui(智能平台) <xi...@sohu-inc.com> 于2022年2月14日周一 17:49写道:

> 各位老师好:
> 执行下面代码导致所有节点磁盘占满,在本地调试时C盘也沾满了
> 文件名称:flink-netty-shuffle-b71f58a6-0fdb-437f-a6b8-6b0bbedd3dfa
> 说明:
>
> 1.  批处理模式
> 2.本地测试时输入目录oneDay和long大小在1G左右,启动程序后会把C(C:\Users\xxx
> \AppData\Local\Temp)盘剩余的几十G空间占满,部署到集群后,也会逐渐占满各节点磁盘
>
>
> 3.广播流blackListStream大概一万条记录,尝试把process中获取广播变量的代码和processBroadcastElement方法注释了,仍不起作用
>
>
>
>         String oneDayLogFile = "C:\\Users\\xianghuibai\\Desktop\\oneDay";
>         String historyFileName = "C:\\Users\\xianghuibai\\Desktop\\long";
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
>         DataStream<String> blackListStream =
> env.fromCollection(RedisPool20484Utils.getCustomJedisCluster().smembers("user_blacklist_cid_test"));
>
>         MapStateDescriptor<String, Boolean> type =
>                 new MapStateDescriptor<String, Boolean>("blackList_type",
> BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO);
>         BroadcastStream<String> blackList_b =
> blackListStream.broadcast(type);
>
>         DataStream<Tuple5<String, String, String, String, String>>
> oneDayLog = env.readTextFile(oneDayLogFile)
>                 .map(new MapFunction<String, Tuple5<String, String,
> String, String, String>>() {
>                     @Override
>                     public Tuple5<String, String, String, String, String>
> map(String line) throws Exception {
>                         String[] arrs = line.split("\t");
>                         return new Tuple5<>(arrs[0], arrs[1], arrs[2],
> arrs[3], arrs[4]);
>                     }
>                 });
>
>         SingleOutputStreamOperator<Tuple5<String, String, String, String,
> String>> dayOutput = env.readTextFile(historyFileName)
>                 .flatMap(new FlatParseLong())
>                 .union(oneDayLog)
>                 .connect(blackList_b)
>                 .process(new BroadcastProcessFunction<Tuple5<String,
> String, String, String, String>, String, Tuple5<String, String, String,
> String, String>>() {
>                     private transient ReadOnlyBroadcastState<String,
> Boolean> broadcastState;
>
>                     @Override
>                     public void processElement(Tuple5<String, String,
> String, String, String> value, ReadOnlyContext ctx,
> Collector<Tuple5<String, String, String, String, String>> out) throws
> Exception {
>                         if(broadcastState == null){
>                             broadcastState = ctx.getBroadcastState(type);
>                         }
>                         if(value!=null &&
> !broadcastState.contains(value.f0)){
>                             out.collect(value);
>                         }
>                     }
>                     @Override
>                     public void processBroadcastElement(String value,
> Context ctx, Collector<Tuple5<String, String, String, String, String>> out)
> throws Exception {
>                         if(StringUtils.isNotEmpty(value)){
>                             BroadcastState<String, Boolean> broadcastState
> = ctx.getBroadcastState(type);
>                             broadcastState.put(value, true);
>                         }
>                     }
>                 });
>
>