You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 知而不惑 <ch...@qq.com.INVALID> on 2023/03/14 05:44:09 UTC

回复: 广播流与非广播流 数据先后问题

我订阅了,但是我只能收到你的回复




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <huweihua.ckl@gmail.com&gt;;
发送时间:&nbsp;2023年2月21日(星期二) 中午12:53
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;"chenliangvvip"<chenliangvvip@qq.com.invalid&gt;;

主题:&nbsp;Re: 广播流与非广播流 数据先后问题



Hi,

可以看到你的问题,你需要订阅 user-zh 的邮件列表才能收到相关的回复,可以参考:
https://flink.apache.org/community.html

Best,
Weihua


On Tue, Feb 21, 2023 at 12:17 PM 知而不惑 <chenliangvvip@qq.com.invalid&gt; wrote:

&gt; 有收到我的问题吗
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user-zh"
&gt;
&gt; <chenliangvvip@qq.com.INVALID&amp;gt;;
&gt; 发送时间:&amp;nbsp;2023年2月21日(星期二) 上午9:37
&gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;广播流与非广播流 数据先后问题
&gt;
&gt;
&gt;
&gt; 各位大佬好
&gt; 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction
&gt; 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt
&gt; 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
&gt; 以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
&gt; @Override
&gt; public void processElement(FileEventOuterClass.FileEvent value,
&gt; BroadcastProcessFunction<FileEventOuterClass.FileEvent,
&gt; List<SensitiveRule&amp;amp;gt;,
&gt; FileEventOuterClass.FileEvent&amp;amp;gt;.ReadOnlyContext ctx,
&gt; Collector<FileEventOuterClass.FileEvent&amp;amp;gt; out) {
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; try {
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; ReadOnlyBroadcastState<Void,
&gt; List<SensitiveRule&amp;amp;gt;&amp;amp;gt; broadcastState =
&gt; ctx.getBroadcastState(ruleDescriptor);
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; List<SensitiveRule&amp;amp;gt;
&gt; sensitiveRules = broadcastState.get(null);
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; if
&gt; (CollectionUtils.isEmpty(sensitiveRules)) {
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; return;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; }
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; ....
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; } catch (Exception e) {
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; log.error("SensitiveDataClassify err:", e);
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; }
&gt; }
&gt; public static void main(String[] args) throws Exception {
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; StreamExecutionEnvironment env =
&gt; StreamExecutionEnvironment.getExecutionEnvironment();
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; env.setParallelism(1);
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; MapStateDescriptor<Void,
&gt; List<SensitiveRule&amp;amp;gt;&amp;amp;gt; ruleDescriptor =
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; new
&gt; MapStateDescriptor<&amp;amp;gt;("ruleBroadcastState", Types.VOID, new
&gt; ListTypeInfo<&amp;amp;gt;(SensitiveRule.class));
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; // 广播流
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; BroadcastStream<List<SensitiveRule&amp;amp;gt;&amp;amp;gt;
&gt; broadcast = sensitiveRule.broadcast(ruleDescriptor);
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; DataStreamSource<String&amp;amp;gt; localhost =
&gt; env.socketTextStream("localhost", 11451);
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; SingleOutputStreamOperator<FileEventOuterClass.FileEvent&amp;amp;gt; stream =
&gt; localhost.map((MapFunction<String, FileEventOuterClass.FileEvent&amp;amp;gt;)
&gt; value -&amp;amp;gt;
&gt; FileEventOuterClass.FileEvent.newBuilder().setChannel("aaaa").build());
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; SingleOutputStreamOperator<FileEventOuterClass.FileEvent&amp;amp;gt;
&gt; streamOperator = stream.connect(broadcast).process(new
&gt; SensitiveDataClassify());
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; streamOperator.print("qqq");
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; env.execute();
&gt;
&gt; }