You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 知而不惑 <ch...@qq.com.INVALID> on 2023/02/21 01:37:53 UTC

广播流与非广播流 数据先后问题

各位大佬好
我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
@Override
public void processElement(FileEventOuterClass.FileEvent value, BroadcastProcessFunction<FileEventOuterClass.FileEvent, List<SensitiveRule&gt;, FileEventOuterClass.FileEvent&gt;.ReadOnlyContext ctx, Collector<FileEventOuterClass.FileEvent&gt; out) {
    try {
        ReadOnlyBroadcastState<Void, List<SensitiveRule&gt;&gt; broadcastState = ctx.getBroadcastState(ruleDescriptor);

        List<SensitiveRule&gt; sensitiveRules = broadcastState.get(null);
        if (CollectionUtils.isEmpty(sensitiveRules)) {
            return;
        }
        ....
    } catch (Exception e) {
        log.error("SensitiveDataClassify err:", e);
    }
}
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    MapStateDescriptor<Void, List<SensitiveRule&gt;&gt; ruleDescriptor =
            new MapStateDescriptor<&gt;("ruleBroadcastState", Types.VOID, new ListTypeInfo<&gt;(SensitiveRule.class));

    // 广播流
    BroadcastStream<List<SensitiveRule&gt;&gt; broadcast = sensitiveRule.broadcast(ruleDescriptor);

    DataStreamSource<String&gt; localhost = env.socketTextStream("localhost", 11451);
    SingleOutputStreamOperator<FileEventOuterClass.FileEvent&gt; stream = localhost.map((MapFunction<String, FileEventOuterClass.FileEvent&gt;) value -&gt; FileEventOuterClass.FileEvent.newBuilder().setChannel("aaaa").build());

    SingleOutputStreamOperator<FileEventOuterClass.FileEvent&gt; streamOperator = stream.connect(broadcast).process(new SensitiveDataClassify());
    streamOperator.print("qqq");
    env.execute();

}

回复: 广播流与非广播流 数据先后问题

Posted by 知而不惑 <ch...@qq.com.INVALID>.
我订阅了,但是我只能收到你的回复




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <huweihua.ckl@gmail.com&gt;;
发送时间:&nbsp;2023年2月21日(星期二) 中午12:53
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;"chenliangvvip"<chenliangvvip@qq.com.invalid&gt;;

主题:&nbsp;Re: 广播流与非广播流 数据先后问题



Hi,

可以看到你的问题,你需要订阅 user-zh 的邮件列表才能收到相关的回复,可以参考:
https://flink.apache.org/community.html

Best,
Weihua


On Tue, Feb 21, 2023 at 12:17 PM 知而不惑 <chenliangvvip@qq.com.invalid&gt; wrote:

&gt; 有收到我的问题吗
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user-zh"
&gt;
&gt; <chenliangvvip@qq.com.INVALID&amp;gt;;
&gt; 发送时间:&amp;nbsp;2023年2月21日(星期二) 上午9:37
&gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;广播流与非广播流 数据先后问题
&gt;
&gt;
&gt;
&gt; 各位大佬好
&gt; 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction
&gt; 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt
&gt; 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
&gt; 以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
&gt; @Override
&gt; public void processElement(FileEventOuterClass.FileEvent value,
&gt; BroadcastProcessFunction<FileEventOuterClass.FileEvent,
&gt; List<SensitiveRule&amp;amp;gt;,
&gt; FileEventOuterClass.FileEvent&amp;amp;gt;.ReadOnlyContext ctx,
&gt; Collector<FileEventOuterClass.FileEvent&amp;amp;gt; out) {
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; try {
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; ReadOnlyBroadcastState<Void,
&gt; List<SensitiveRule&amp;amp;gt;&amp;amp;gt; broadcastState =
&gt; ctx.getBroadcastState(ruleDescriptor);
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; List<SensitiveRule&amp;amp;gt;
&gt; sensitiveRules = broadcastState.get(null);
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; if
&gt; (CollectionUtils.isEmpty(sensitiveRules)) {
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; return;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; }
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; ....
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; } catch (Exception e) {
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; log.error("SensitiveDataClassify err:", e);
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; }
&gt; }
&gt; public static void main(String[] args) throws Exception {
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; StreamExecutionEnvironment env =
&gt; StreamExecutionEnvironment.getExecutionEnvironment();
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; env.setParallelism(1);
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; MapStateDescriptor<Void,
&gt; List<SensitiveRule&amp;amp;gt;&amp;amp;gt; ruleDescriptor =
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; new
&gt; MapStateDescriptor<&amp;amp;gt;("ruleBroadcastState", Types.VOID, new
&gt; ListTypeInfo<&amp;amp;gt;(SensitiveRule.class));
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; // 广播流
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; BroadcastStream<List<SensitiveRule&amp;amp;gt;&amp;amp;gt;
&gt; broadcast = sensitiveRule.broadcast(ruleDescriptor);
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; DataStreamSource<String&amp;amp;gt; localhost =
&gt; env.socketTextStream("localhost", 11451);
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; SingleOutputStreamOperator<FileEventOuterClass.FileEvent&amp;amp;gt; stream =
&gt; localhost.map((MapFunction<String, FileEventOuterClass.FileEvent&amp;amp;gt;)
&gt; value -&amp;amp;gt;
&gt; FileEventOuterClass.FileEvent.newBuilder().setChannel("aaaa").build());
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; SingleOutputStreamOperator<FileEventOuterClass.FileEvent&amp;amp;gt;
&gt; streamOperator = stream.connect(broadcast).process(new
&gt; SensitiveDataClassify());
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; streamOperator.print("qqq");
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; env.execute();
&gt;
&gt; }

Re: 广播流与非广播流 数据先后问题

Posted by Weihua Hu <hu...@gmail.com>.
Hi,

可以看到你的问题,你需要订阅 user-zh 的邮件列表才能收到相关的回复,可以参考:
https://flink.apache.org/community.html

Best,
Weihua


On Tue, Feb 21, 2023 at 12:17 PM 知而不惑 <ch...@qq.com.invalid> wrote:

> 有收到我的问题吗
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>
> <chenliangvvip@qq.com.INVALID&gt;;
> 发送时间:&nbsp;2023年2月21日(星期二) 上午9:37
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;广播流与非广播流 数据先后问题
>
>
>
> 各位大佬好
> 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction
> 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt
> 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
> 以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
> @Override
> public void processElement(FileEventOuterClass.FileEvent value,
> BroadcastProcessFunction<FileEventOuterClass.FileEvent,
> List<SensitiveRule&amp;gt;,
> FileEventOuterClass.FileEvent&amp;gt;.ReadOnlyContext ctx,
> Collector<FileEventOuterClass.FileEvent&amp;gt; out) {
> &nbsp;&nbsp;&nbsp; try {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ReadOnlyBroadcastState<Void,
> List<SensitiveRule&amp;gt;&amp;gt; broadcastState =
> ctx.getBroadcastState(ruleDescriptor);
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; List<SensitiveRule&amp;gt;
> sensitiveRules = broadcastState.get(null);
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if
> (CollectionUtils.isEmpty(sensitiveRules)) {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return;
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ....
> &nbsp;&nbsp;&nbsp; } catch (Exception e) {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> log.error("SensitiveDataClassify err:", e);
> &nbsp;&nbsp;&nbsp; }
> }
> public static void main(String[] args) throws Exception {
> &nbsp;&nbsp;&nbsp; StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> &nbsp;&nbsp;&nbsp; env.setParallelism(1);
>
> &nbsp;&nbsp;&nbsp; MapStateDescriptor<Void,
> List<SensitiveRule&amp;gt;&amp;gt; ruleDescriptor =
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; new
> MapStateDescriptor<&amp;gt;("ruleBroadcastState", Types.VOID, new
> ListTypeInfo<&amp;gt;(SensitiveRule.class));
>
> &nbsp;&nbsp;&nbsp; // 广播流
> &nbsp;&nbsp;&nbsp; BroadcastStream<List<SensitiveRule&amp;gt;&amp;gt;
> broadcast = sensitiveRule.broadcast(ruleDescriptor);
>
> &nbsp;&nbsp;&nbsp; DataStreamSource<String&amp;gt; localhost =
> env.socketTextStream("localhost", 11451);
> &nbsp;&nbsp;&nbsp;
> SingleOutputStreamOperator<FileEventOuterClass.FileEvent&amp;gt; stream =
> localhost.map((MapFunction<String, FileEventOuterClass.FileEvent&amp;gt;)
> value -&amp;gt;
> FileEventOuterClass.FileEvent.newBuilder().setChannel("aaaa").build());
>
> &nbsp;&nbsp;&nbsp;
> SingleOutputStreamOperator<FileEventOuterClass.FileEvent&amp;gt;
> streamOperator = stream.connect(broadcast).process(new
> SensitiveDataClassify());
> &nbsp;&nbsp;&nbsp; streamOperator.print("qqq");
> &nbsp;&nbsp;&nbsp; env.execute();
>
> }

回复:广播流与非广播流 数据先后问题

Posted by 知而不惑 <ch...@qq.com.INVALID>.
有收到我的问题吗




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <chenliangvvip@qq.com.INVALID&gt;;
发送时间:&nbsp;2023年2月21日(星期二) 上午9:37
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;广播流与非广播流 数据先后问题



各位大佬好
我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
@Override
public void processElement(FileEventOuterClass.FileEvent value, BroadcastProcessFunction<FileEventOuterClass.FileEvent, List<SensitiveRule&amp;gt;, FileEventOuterClass.FileEvent&amp;gt;.ReadOnlyContext ctx, Collector<FileEventOuterClass.FileEvent&amp;gt; out) {
&nbsp;&nbsp;&nbsp; try {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ReadOnlyBroadcastState<Void, List<SensitiveRule&amp;gt;&amp;gt; broadcastState = ctx.getBroadcastState(ruleDescriptor);

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; List<SensitiveRule&amp;gt; sensitiveRules = broadcastState.get(null);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (CollectionUtils.isEmpty(sensitiveRules)) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ....
&nbsp;&nbsp;&nbsp; } catch (Exception e) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; log.error("SensitiveDataClassify err:", e);
&nbsp;&nbsp;&nbsp; }
}
public static void main(String[] args) throws Exception {
&nbsp;&nbsp;&nbsp; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
&nbsp;&nbsp;&nbsp; env.setParallelism(1);

&nbsp;&nbsp;&nbsp; MapStateDescriptor<Void, List<SensitiveRule&amp;gt;&amp;gt; ruleDescriptor =
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; new MapStateDescriptor<&amp;gt;("ruleBroadcastState", Types.VOID, new ListTypeInfo<&amp;gt;(SensitiveRule.class));

&nbsp;&nbsp;&nbsp; // 广播流
&nbsp;&nbsp;&nbsp; BroadcastStream<List<SensitiveRule&amp;gt;&amp;gt; broadcast = sensitiveRule.broadcast(ruleDescriptor);

&nbsp;&nbsp;&nbsp; DataStreamSource<String&amp;gt; localhost = env.socketTextStream("localhost", 11451);
&nbsp;&nbsp;&nbsp; SingleOutputStreamOperator<FileEventOuterClass.FileEvent&amp;gt; stream = localhost.map((MapFunction<String, FileEventOuterClass.FileEvent&amp;gt;) value -&amp;gt; FileEventOuterClass.FileEvent.newBuilder().setChannel("aaaa").build());

&nbsp;&nbsp;&nbsp; SingleOutputStreamOperator<FileEventOuterClass.FileEvent&amp;gt; streamOperator = stream.connect(broadcast).process(new SensitiveDataClassify());
&nbsp;&nbsp;&nbsp; streamOperator.print("qqq");
&nbsp;&nbsp;&nbsp; env.execute();

}

Re: 广播流与非广播流 数据先后问题

Posted by Weihua Hu <hu...@gmail.com>.
Hi,

可以把具体的报错信息贴一下,另外代码中没有看到使用 listState 缓存元素的部分

Best,
Weihua


On Tue, Feb 21, 2023 at 9:38 AM 知而不惑 <ch...@qq.com.invalid> wrote:

> 各位大佬好
> 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction
> 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt
> 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
> 以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
> @Override
> public void processElement(FileEventOuterClass.FileEvent value,
> BroadcastProcessFunction<FileEventOuterClass.FileEvent,
> List<SensitiveRule&gt;, FileEventOuterClass.FileEvent&gt;.ReadOnlyContext
> ctx, Collector<FileEventOuterClass.FileEvent&gt; out) {
>     try {
>         ReadOnlyBroadcastState<Void, List<SensitiveRule&gt;&gt;
> broadcastState = ctx.getBroadcastState(ruleDescriptor);
>
>         List<SensitiveRule&gt; sensitiveRules = broadcastState.get(null);
>         if (CollectionUtils.isEmpty(sensitiveRules)) {
>             return;
>         }
>         ....
>     } catch (Exception e) {
>         log.error("SensitiveDataClassify err:", e);
>     }
> }
> public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setParallelism(1);
>
>     MapStateDescriptor<Void, List<SensitiveRule&gt;&gt; ruleDescriptor =
>             new MapStateDescriptor<&gt;("ruleBroadcastState", Types.VOID,
> new ListTypeInfo<&gt;(SensitiveRule.class));
>
>     // 广播流
>     BroadcastStream<List<SensitiveRule&gt;&gt; broadcast =
> sensitiveRule.broadcast(ruleDescriptor);
>
>     DataStreamSource<String&gt; localhost =
> env.socketTextStream("localhost", 11451);
>     SingleOutputStreamOperator<FileEventOuterClass.FileEvent&gt; stream =
> localhost.map((MapFunction<String, FileEventOuterClass.FileEvent&gt;) value
> -&gt;
> FileEventOuterClass.FileEvent.newBuilder().setChannel("aaaa").build());
>
>     SingleOutputStreamOperator<FileEventOuterClass.FileEvent&gt;
> streamOperator = stream.connect(broadcast).process(new
> SensitiveDataClassify());
>     streamOperator.print("qqq");
>     env.execute();
>
> }