You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "Bai XiangHui(智能平台)" <xi...@sohu-inc.com> on 2022/02/14 09:49:04 UTC
flink-netty-shuffle文件占满节点磁盘
各位老师好:
执行下面代码导致所有节点磁盘占满,在本地调试时C盘也沾满了
文件名称:flink-netty-shuffle-b71f58a6-0fdb-437f-a6b8-6b0bbedd3dfa
说明:
1. 批处理模式
2.本地测试时输入目录oneDay和long大小在1G左右,启动程序后会把C(C:\Users\xxx \AppData\Local\Temp)盘剩余的几十G空间占满,部署到集群后,也会逐渐占满各节点磁盘
3.广播流blackListStream大概一万条记录,尝试把process中获取广播变量的代码和processBroadcastElement方法注释了,仍不起作用
String oneDayLogFile = "C:\\Users\\xianghuibai\\Desktop\\oneDay";
String historyFileName = "C:\\Users\\xianghuibai\\Desktop\\long";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStream<String> blackListStream = env.fromCollection(RedisPool20484Utils.getCustomJedisCluster().smembers("user_blacklist_cid_test"));
MapStateDescriptor<String, Boolean> type =
new MapStateDescriptor<String, Boolean>("blackList_type", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO);
BroadcastStream<String> blackList_b = blackListStream.broadcast(type);
DataStream<Tuple5<String, String, String, String, String>> oneDayLog = env.readTextFile(oneDayLogFile)
.map(new MapFunction<String, Tuple5<String, String, String, String, String>>() {
@Override
public Tuple5<String, String, String, String, String> map(String line) throws Exception {
String[] arrs = line.split("\t");
return new Tuple5<>(arrs[0], arrs[1], arrs[2], arrs[3], arrs[4]);
}
});
SingleOutputStreamOperator<Tuple5<String, String, String, String, String>> dayOutput = env.readTextFile(historyFileName)
.flatMap(new FlatParseLong())
.union(oneDayLog)
.connect(blackList_b)
.process(new BroadcastProcessFunction<Tuple5<String, String, String, String, String>, String, Tuple5<String, String, String, String, String>>() {
private transient ReadOnlyBroadcastState<String, Boolean> broadcastState;
@Override
public void processElement(Tuple5<String, String, String, String, String> value, ReadOnlyContext ctx, Collector<Tuple5<String, String, String, String, String>> out) throws Exception {
if(broadcastState == null){
broadcastState = ctx.getBroadcastState(type);
}
if(value!=null && !broadcastState.contains(value.f0)){
out.collect(value);
}
}
@Override
public void processBroadcastElement(String value, Context ctx, Collector<Tuple5<String, String, String, String, String>> out) throws Exception {
if(StringUtils.isNotEmpty(value)){
BroadcastState<String, Boolean> broadcastState = ctx.getBroadcastState(type);
broadcastState.put(value, true);
}
}
});
Re: flink-netty-shuffle文件占满节点磁盘
Posted by Yingjie Cao <ke...@gmail.com>.
磁盘占满报的错误是什么呢?是iNode不够用还是磁盘空间不够用呢?我理解这个是个目录吧:flink-netty-shuffle-b71f58a6-0fdb-437f-a6b8-6b0bbedd3dfa,是说这个目录太多了没被清理掉,导致iNode不足,还是说这个目录下的文件没被清理干净,导致磁盘空间被占满呢?如果作业停掉,会恢复吗(也就是说是说作业本身就是要用这么多磁盘,还是有泄露,即使job停掉数据依然在呢)?
另外就是作业用的什么版本的flink呢?建议开一下数据压缩,如果作业本身确实需要的磁盘空间很大,那压缩应该有利于节省磁盘空间,另外默认是hash
shuffle的实现,文件很多,会占用很多的iNode,容易导致报磁盘空间不足的错误。有一个文档可以参考下:
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/batch/blocking_shuffle/
。不过这个文档的内容还没有更新成最新的,估计明天会更新,如果着急的话,也可以直接参考下源代码中的描述:
https://github.com/apache/flink/blob/master/docs/content.zh/docs/ops/batch/blocking_shuffle.md
。
Bai XiangHui(智能平台) <xi...@sohu-inc.com> 于2022年2月14日周一 17:49写道:
> 各位老师好:
> 执行下面代码导致所有节点磁盘占满,在本地调试时C盘也沾满了
> 文件名称:flink-netty-shuffle-b71f58a6-0fdb-437f-a6b8-6b0bbedd3dfa
> 说明:
>
> 1. 批处理模式
> 2.本地测试时输入目录oneDay和long大小在1G左右,启动程序后会把C(C:\Users\xxx
> \AppData\Local\Temp)盘剩余的几十G空间占满,部署到集群后,也会逐渐占满各节点磁盘
>
>
> 3.广播流blackListStream大概一万条记录,尝试把process中获取广播变量的代码和processBroadcastElement方法注释了,仍不起作用
>
>
>
> String oneDayLogFile = "C:\\Users\\xianghuibai\\Desktop\\oneDay";
> String historyFileName = "C:\\Users\\xianghuibai\\Desktop\\long";
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
> DataStream<String> blackListStream =
> env.fromCollection(RedisPool20484Utils.getCustomJedisCluster().smembers("user_blacklist_cid_test"));
>
> MapStateDescriptor<String, Boolean> type =
> new MapStateDescriptor<String, Boolean>("blackList_type",
> BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO);
> BroadcastStream<String> blackList_b =
> blackListStream.broadcast(type);
>
> DataStream<Tuple5<String, String, String, String, String>>
> oneDayLog = env.readTextFile(oneDayLogFile)
> .map(new MapFunction<String, Tuple5<String, String,
> String, String, String>>() {
> @Override
> public Tuple5<String, String, String, String, String>
> map(String line) throws Exception {
> String[] arrs = line.split("\t");
> return new Tuple5<>(arrs[0], arrs[1], arrs[2],
> arrs[3], arrs[4]);
> }
> });
>
> SingleOutputStreamOperator<Tuple5<String, String, String, String,
> String>> dayOutput = env.readTextFile(historyFileName)
> .flatMap(new FlatParseLong())
> .union(oneDayLog)
> .connect(blackList_b)
> .process(new BroadcastProcessFunction<Tuple5<String,
> String, String, String, String>, String, Tuple5<String, String, String,
> String, String>>() {
> private transient ReadOnlyBroadcastState<String,
> Boolean> broadcastState;
>
> @Override
> public void processElement(Tuple5<String, String,
> String, String, String> value, ReadOnlyContext ctx,
> Collector<Tuple5<String, String, String, String, String>> out) throws
> Exception {
> if(broadcastState == null){
> broadcastState = ctx.getBroadcastState(type);
> }
> if(value!=null &&
> !broadcastState.contains(value.f0)){
> out.collect(value);
> }
> }
> @Override
> public void processBroadcastElement(String value,
> Context ctx, Collector<Tuple5<String, String, String, String, String>> out)
> throws Exception {
> if(StringUtils.isNotEmpty(value)){
> BroadcastState<String, Boolean> broadcastState
> = ctx.getBroadcastState(type);
> broadcastState.put(value, true);
> }
> }
> });
>
>