You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 知而不惑 <ch...@qq.com.INVALID> on 2023/02/21 01:37:53 UTC
广播流与非广播流 数据先后问题
各位大佬好
我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
@Override
public void processElement(FileEventOuterClass.FileEvent value, BroadcastProcessFunction<FileEventOuterClass.FileEvent, List<SensitiveRule>, FileEventOuterClass.FileEvent>.ReadOnlyContext ctx, Collector<FileEventOuterClass.FileEvent> out) {
try {
ReadOnlyBroadcastState<Void, List<SensitiveRule>> broadcastState = ctx.getBroadcastState(ruleDescriptor);
List<SensitiveRule> sensitiveRules = broadcastState.get(null);
if (CollectionUtils.isEmpty(sensitiveRules)) {
return;
}
....
} catch (Exception e) {
log.error("SensitiveDataClassify err:", e);
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
MapStateDescriptor<Void, List<SensitiveRule>> ruleDescriptor =
new MapStateDescriptor<>("ruleBroadcastState", Types.VOID, new ListTypeInfo<>(SensitiveRule.class));
// 广播流
BroadcastStream<List<SensitiveRule>> broadcast = sensitiveRule.broadcast(ruleDescriptor);
DataStreamSource<String> localhost = env.socketTextStream("localhost", 11451);
SingleOutputStreamOperator<FileEventOuterClass.FileEvent> stream = localhost.map((MapFunction<String, FileEventOuterClass.FileEvent>) value -> FileEventOuterClass.FileEvent.newBuilder().setChannel("aaaa").build());
SingleOutputStreamOperator<FileEventOuterClass.FileEvent> streamOperator = stream.connect(broadcast).process(new SensitiveDataClassify());
streamOperator.print("qqq");
env.execute();
}
回复: 广播流与非广播流 数据先后问题
Posted by 知而不惑 <ch...@qq.com.INVALID>.
我订阅了,但是我只能收到你的回复
------------------ 原始邮件 ------------------
发件人: "user-zh" <huweihua.ckl@gmail.com>;
发送时间: 2023年2月21日(星期二) 中午12:53
收件人: "user-zh"<user-zh@flink.apache.org>;"chenliangvvip"<chenliangvvip@qq.com.invalid>;
主题: Re: 广播流与非广播流 数据先后问题
Hi,
可以看到你的问题,你需要订阅 user-zh 的邮件列表才能收到相关的回复,可以参考:
https://flink.apache.org/community.html
Best,
Weihua
On Tue, Feb 21, 2023 at 12:17 PM 知而不惑 <chenliangvvip@qq.com.invalid> wrote:
> 有收到我的问题吗
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
> "user-zh"
>
> <chenliangvvip@qq.com.INVALID&gt;;
> 发送时间:&nbsp;2023年2月21日(星期二) 上午9:37
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;广播流与非广播流 数据先后问题
>
>
>
> 各位大佬好
> 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction
> 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt
> 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
> 以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
> @Override
> public void processElement(FileEventOuterClass.FileEvent value,
> BroadcastProcessFunction<FileEventOuterClass.FileEvent,
> List<SensitiveRule&amp;gt;,
> FileEventOuterClass.FileEvent&amp;gt;.ReadOnlyContext ctx,
> Collector<FileEventOuterClass.FileEvent&amp;gt; out) {
> &nbsp;&nbsp;&nbsp; try {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ReadOnlyBroadcastState<Void,
> List<SensitiveRule&amp;gt;&amp;gt; broadcastState =
> ctx.getBroadcastState(ruleDescriptor);
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; List<SensitiveRule&amp;gt;
> sensitiveRules = broadcastState.get(null);
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if
> (CollectionUtils.isEmpty(sensitiveRules)) {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return;
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ....
> &nbsp;&nbsp;&nbsp; } catch (Exception e) {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> log.error("SensitiveDataClassify err:", e);
> &nbsp;&nbsp;&nbsp; }
> }
> public static void main(String[] args) throws Exception {
> &nbsp;&nbsp;&nbsp; StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> &nbsp;&nbsp;&nbsp; env.setParallelism(1);
>
> &nbsp;&nbsp;&nbsp; MapStateDescriptor<Void,
> List<SensitiveRule&amp;gt;&amp;gt; ruleDescriptor =
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; new
> MapStateDescriptor<&amp;gt;("ruleBroadcastState", Types.VOID, new
> ListTypeInfo<&amp;gt;(SensitiveRule.class));
>
> &nbsp;&nbsp;&nbsp; // 广播流
> &nbsp;&nbsp;&nbsp; BroadcastStream<List<SensitiveRule&amp;gt;&amp;gt;
> broadcast = sensitiveRule.broadcast(ruleDescriptor);
>
> &nbsp;&nbsp;&nbsp; DataStreamSource<String&amp;gt; localhost =
> env.socketTextStream("localhost", 11451);
> &nbsp;&nbsp;&nbsp;
> SingleOutputStreamOperator<FileEventOuterClass.FileEvent&amp;gt; stream =
> localhost.map((MapFunction<String, FileEventOuterClass.FileEvent&amp;gt;)
> value -&amp;gt;
> FileEventOuterClass.FileEvent.newBuilder().setChannel("aaaa").build());
>
> &nbsp;&nbsp;&nbsp;
> SingleOutputStreamOperator<FileEventOuterClass.FileEvent&amp;gt;
> streamOperator = stream.connect(broadcast).process(new
> SensitiveDataClassify());
> &nbsp;&nbsp;&nbsp; streamOperator.print("qqq");
> &nbsp;&nbsp;&nbsp; env.execute();
>
> }
Re: 广播流与非广播流 数据先后问题
Posted by Weihua Hu <hu...@gmail.com>.
Hi,
可以看到你的问题,你需要订阅 user-zh 的邮件列表才能收到相关的回复,可以参考:
https://flink.apache.org/community.html
Best,
Weihua
On Tue, Feb 21, 2023 at 12:17 PM 知而不惑 <ch...@qq.com.invalid> wrote:
> 有收到我的问题吗
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人:
> "user-zh"
>
> <chenliangvvip@qq.com.INVALID>;
> 发送时间: 2023年2月21日(星期二) 上午9:37
> 收件人: "user-zh"<user-zh@flink.apache.org>;
>
> 主题: 广播流与非广播流 数据先后问题
>
>
>
> 各位大佬好
> 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction
> 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt
> 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
> 以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
> @Override
> public void processElement(FileEventOuterClass.FileEvent value,
> BroadcastProcessFunction<FileEventOuterClass.FileEvent,
> List<SensitiveRule&gt;,
> FileEventOuterClass.FileEvent&gt;.ReadOnlyContext ctx,
> Collector<FileEventOuterClass.FileEvent&gt; out) {
> try {
> ReadOnlyBroadcastState<Void,
> List<SensitiveRule&gt;&gt; broadcastState =
> ctx.getBroadcastState(ruleDescriptor);
>
> List<SensitiveRule&gt;
> sensitiveRules = broadcastState.get(null);
> if
> (CollectionUtils.isEmpty(sensitiveRules)) {
> return;
> }
> ....
> } catch (Exception e) {
>
> log.error("SensitiveDataClassify err:", e);
> }
> }
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>
> MapStateDescriptor<Void,
> List<SensitiveRule&gt;&gt; ruleDescriptor =
> new
> MapStateDescriptor<&gt;("ruleBroadcastState", Types.VOID, new
> ListTypeInfo<&gt;(SensitiveRule.class));
>
> // 广播流
> BroadcastStream<List<SensitiveRule&gt;&gt;
> broadcast = sensitiveRule.broadcast(ruleDescriptor);
>
> DataStreamSource<String&gt; localhost =
> env.socketTextStream("localhost", 11451);
>
> SingleOutputStreamOperator<FileEventOuterClass.FileEvent&gt; stream =
> localhost.map((MapFunction<String, FileEventOuterClass.FileEvent&gt;)
> value -&gt;
> FileEventOuterClass.FileEvent.newBuilder().setChannel("aaaa").build());
>
>
> SingleOutputStreamOperator<FileEventOuterClass.FileEvent&gt;
> streamOperator = stream.connect(broadcast).process(new
> SensitiveDataClassify());
> streamOperator.print("qqq");
> env.execute();
>
> }
回复:广播流与非广播流 数据先后问题
Posted by 知而不惑 <ch...@qq.com.INVALID>.
有收到我的问题吗
------------------ 原始邮件 ------------------
发件人: "user-zh" <chenliangvvip@qq.com.INVALID>;
发送时间: 2023年2月21日(星期二) 上午9:37
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: 广播流与非广播流 数据先后问题
各位大佬好
我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
@Override
public void processElement(FileEventOuterClass.FileEvent value, BroadcastProcessFunction<FileEventOuterClass.FileEvent, List<SensitiveRule&gt;, FileEventOuterClass.FileEvent&gt;.ReadOnlyContext ctx, Collector<FileEventOuterClass.FileEvent&gt; out) {
try {
ReadOnlyBroadcastState<Void, List<SensitiveRule&gt;&gt; broadcastState = ctx.getBroadcastState(ruleDescriptor);
List<SensitiveRule&gt; sensitiveRules = broadcastState.get(null);
if (CollectionUtils.isEmpty(sensitiveRules)) {
return;
}
....
} catch (Exception e) {
log.error("SensitiveDataClassify err:", e);
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
MapStateDescriptor<Void, List<SensitiveRule&gt;&gt; ruleDescriptor =
new MapStateDescriptor<&gt;("ruleBroadcastState", Types.VOID, new ListTypeInfo<&gt;(SensitiveRule.class));
// 广播流
BroadcastStream<List<SensitiveRule&gt;&gt; broadcast = sensitiveRule.broadcast(ruleDescriptor);
DataStreamSource<String&gt; localhost = env.socketTextStream("localhost", 11451);
SingleOutputStreamOperator<FileEventOuterClass.FileEvent&gt; stream = localhost.map((MapFunction<String, FileEventOuterClass.FileEvent&gt;) value -&gt; FileEventOuterClass.FileEvent.newBuilder().setChannel("aaaa").build());
SingleOutputStreamOperator<FileEventOuterClass.FileEvent&gt; streamOperator = stream.connect(broadcast).process(new SensitiveDataClassify());
streamOperator.print("qqq");
env.execute();
}
Re: 广播流与非广播流 数据先后问题
Posted by Weihua Hu <hu...@gmail.com>.
Hi,
可以把具体的报错信息贴一下,另外代码中没有看到使用 listState 缓存元素的部分
Best,
Weihua
On Tue, Feb 21, 2023 at 9:38 AM 知而不惑 <ch...@qq.com.invalid> wrote:
> 各位大佬好
> 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction
> 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt
> 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
> 以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
> @Override
> public void processElement(FileEventOuterClass.FileEvent value,
> BroadcastProcessFunction<FileEventOuterClass.FileEvent,
> List<SensitiveRule>, FileEventOuterClass.FileEvent>.ReadOnlyContext
> ctx, Collector<FileEventOuterClass.FileEvent> out) {
> try {
> ReadOnlyBroadcastState<Void, List<SensitiveRule>>
> broadcastState = ctx.getBroadcastState(ruleDescriptor);
>
> List<SensitiveRule> sensitiveRules = broadcastState.get(null);
> if (CollectionUtils.isEmpty(sensitiveRules)) {
> return;
> }
> ....
> } catch (Exception e) {
> log.error("SensitiveDataClassify err:", e);
> }
> }
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>
> MapStateDescriptor<Void, List<SensitiveRule>> ruleDescriptor =
> new MapStateDescriptor<>("ruleBroadcastState", Types.VOID,
> new ListTypeInfo<>(SensitiveRule.class));
>
> // 广播流
> BroadcastStream<List<SensitiveRule>> broadcast =
> sensitiveRule.broadcast(ruleDescriptor);
>
> DataStreamSource<String> localhost =
> env.socketTextStream("localhost", 11451);
> SingleOutputStreamOperator<FileEventOuterClass.FileEvent> stream =
> localhost.map((MapFunction<String, FileEventOuterClass.FileEvent>) value
> ->
> FileEventOuterClass.FileEvent.newBuilder().setChannel("aaaa").build());
>
> SingleOutputStreamOperator<FileEventOuterClass.FileEvent>
> streamOperator = stream.connect(broadcast).process(new
> SensitiveDataClassify());
> streamOperator.print("qqq");
> env.execute();
>
> }