You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 知而不惑 <ch...@qq.com.INVALID> on 2023/03/14 05:44:09 UTC
回复: 广播流与非广播流 数据先后问题
我订阅了,但是我只能收到你的回复
------------------ 原始邮件 ------------------
发件人: "user-zh" <huweihua.ckl@gmail.com>;
发送时间: 2023年2月21日(星期二) 中午12:53
收件人: "user-zh"<user-zh@flink.apache.org>;"chenliangvvip"<chenliangvvip@qq.com.invalid>;
主题: Re: 广播流与非广播流 数据先后问题
Hi,
可以看到你的问题,你需要订阅 user-zh 的邮件列表才能收到相关的回复,可以参考:
https://flink.apache.org/community.html
Best,
Weihua
On Tue, Feb 21, 2023 at 12:17 PM 知而不惑 <chenliangvvip@qq.com.invalid> wrote:
> 有收到我的问题吗
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
> "user-zh"
>
> <chenliangvvip@qq.com.INVALID&gt;;
> 发送时间:&nbsp;2023年2月21日(星期二) 上午9:37
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;广播流与非广播流 数据先后问题
>
>
>
> 各位大佬好
> 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction
> 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt
> 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
> 以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
> @Override
> public void processElement(FileEventOuterClass.FileEvent value,
> BroadcastProcessFunction<FileEventOuterClass.FileEvent,
> List<SensitiveRule&amp;gt;,
> FileEventOuterClass.FileEvent&amp;gt;.ReadOnlyContext ctx,
> Collector<FileEventOuterClass.FileEvent&amp;gt; out) {
> &nbsp;&nbsp;&nbsp; try {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ReadOnlyBroadcastState<Void,
> List<SensitiveRule&amp;gt;&amp;gt; broadcastState =
> ctx.getBroadcastState(ruleDescriptor);
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; List<SensitiveRule&amp;gt;
> sensitiveRules = broadcastState.get(null);
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if
> (CollectionUtils.isEmpty(sensitiveRules)) {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return;
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ....
> &nbsp;&nbsp;&nbsp; } catch (Exception e) {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> log.error("SensitiveDataClassify err:", e);
> &nbsp;&nbsp;&nbsp; }
> }
> public static void main(String[] args) throws Exception {
> &nbsp;&nbsp;&nbsp; StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> &nbsp;&nbsp;&nbsp; env.setParallelism(1);
>
> &nbsp;&nbsp;&nbsp; MapStateDescriptor<Void,
> List<SensitiveRule&amp;gt;&amp;gt; ruleDescriptor =
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; new
> MapStateDescriptor<&amp;gt;("ruleBroadcastState", Types.VOID, new
> ListTypeInfo<&amp;gt;(SensitiveRule.class));
>
> &nbsp;&nbsp;&nbsp; // 广播流
> &nbsp;&nbsp;&nbsp; BroadcastStream<List<SensitiveRule&amp;gt;&amp;gt;
> broadcast = sensitiveRule.broadcast(ruleDescriptor);
>
> &nbsp;&nbsp;&nbsp; DataStreamSource<String&amp;gt; localhost =
> env.socketTextStream("localhost", 11451);
> &nbsp;&nbsp;&nbsp;
> SingleOutputStreamOperator<FileEventOuterClass.FileEvent&amp;gt; stream =
> localhost.map((MapFunction<String, FileEventOuterClass.FileEvent&amp;gt;)
> value -&amp;gt;
> FileEventOuterClass.FileEvent.newBuilder().setChannel("aaaa").build());
>
> &nbsp;&nbsp;&nbsp;
> SingleOutputStreamOperator<FileEventOuterClass.FileEvent&amp;gt;
> streamOperator = stream.connect(broadcast).process(new
> SensitiveDataClassify());
> &nbsp;&nbsp;&nbsp; streamOperator.print("qqq");
> &nbsp;&nbsp;&nbsp; env.execute();
>
> }