You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 陈卓宇 <25...@qq.com.INVALID> on 2022/01/10 07:03:19 UTC

批模式疑问

代码逻辑简单描述:  我通过fromElements的方式简单构造了几条测试数据,然后将流转表,在表上
使用我自定义的聚合函数,进行聚合操作,最后打印
我的代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);  //设置调度模式为批

DataStreamSource<Tuple2<String, Integer&gt;&gt; source = env.fromElements(Tuple2.of("aa", 1),
        Tuple2.of("aa", 2),Tuple2.of("aa", 3),Tuple2.of("bb", 2),Tuple2.of("bb", 3),Tuple2.of("bb", 4));
Table table = tenv.fromDataStream(source,
        Schema.newBuilder()
                .column("f0", "STRING")
                .column("f1", "INTEGER")
                .build());
tenv.createTemporaryView("test",table);
//对表进行sql查询
tenv.createTemporarySystemFunction("Average", avg5.Average.class);
tenv.executeSql("SELECT f0,Average(f1) as rbm FROM test group by f0").print();

结果:
+----+--------------------------------+--------------------------------+
| op |                             f0 |                            rbm |
+----+--------------------------------+--------------------------------+
| +I |                             aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
| -U |                             aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
| +U |                             aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
| -U |                             aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
| +U |                             aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
| +I |                             bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
| -U |                             bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
| +U |                             bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
| -U |                             bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
| +U |                             bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
+----+--------------------------------+--------------------------------+
我发现结果是一个可撤回流,这与我所预想批处理是不一致的

请问:我设置的batch,为什么调度模式还没有改变,我该如何解决这个问题,让他成为批处理?

最后:感谢之前几个问题上,社区同学的无私解答,因为自身邮箱出了一些问题,就不一一致谢了,望见谅。

陈卓宇


&nbsp;

回复: 批模式疑问

Posted by 陈卓宇 <25...@qq.com.INVALID>.
太感谢您无私的帮助了,我折腾了一天都不得解,底层的调度逻辑都搞明白了,就是纳闷为什么代码
出来效果不对,都怪我一叶障目了。


陈卓宇


&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <tsreaper96@gmail.com&gt;;
发送时间:&nbsp;2022年1月11日(星期二) 中午11:19
收件人:&nbsp;"flink中文邮件组"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: 批模式疑问



Hi!

env.setRuntimeMode(RuntimeExecutionMode.BATCH); 要放在创建 table environment
之前,否则创建出来的 table environment 还是 streaming 模式。另外这个需要 Flink &gt;= 1.14。



陈卓宇 <2572805166@qq.com.invalid&gt; 于2022年1月10日周一 15:03写道:

&gt; 代码逻辑简单描述:&nbsp; 我通过fromElements的方式简单构造了几条测试数据,然后将流转表,在表上
&gt; 使用我自定义的聚合函数,进行聚合操作,最后打印
&gt; 我的代码:
&gt; StreamExecutionEnvironment env =
&gt; StreamExecutionEnvironment.getExecutionEnvironment();
&gt; StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
&gt; env.setRuntimeMode(RuntimeExecutionMode.BATCH);&nbsp; //设置调度模式为批
&gt;
&gt; DataStreamSource<Tuple2<String, Integer&amp;gt;&amp;gt; source =
&gt; env.fromElements(Tuple2.of("aa", 1),
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Tuple2.of("aa", 2),Tuple2.of("aa", 3),Tuple2.of("bb",
&gt; 2),Tuple2.of("bb", 3),Tuple2.of("bb", 4));
&gt; Table table = tenv.fromDataStream(source,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Schema.newBuilder()
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .column("f0", "STRING")
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .column("f1", "INTEGER")
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .build());
&gt; tenv.createTemporaryView("test",table);
&gt; //对表进行sql查询
&gt; tenv.createTemporarySystemFunction("Average", avg5.Average.class);
&gt; tenv.executeSql("SELECT f0,Average(f1) as rbm FROM test group by
&gt; f0").print();
&gt;
&gt; 结果:
&gt; +----+--------------------------------+--------------------------------+
&gt; | op |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; f0 |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; rbm |
&gt; +----+--------------------------------+--------------------------------+
&gt; | +I |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | -U |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | +U |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | -U |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | +U |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | +I |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | -U |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | +U |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | -U |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | +U |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; +----+--------------------------------+--------------------------------+
&gt; 我发现结果是一个可撤回流,这与我所预想批处理是不一致的
&gt;
&gt; 请问:我设置的batch,为什么调度模式还没有改变,我该如何解决这个问题,让他成为批处理?
&gt;
&gt; 最后:感谢之前几个问题上,社区同学的无私解答,因为自身邮箱出了一些问题,就不一一致谢了,望见谅。
&gt;
&gt; 陈卓宇
&gt;
&gt;
&gt; &amp;nbsp;

Re: 批模式疑问

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

env.setRuntimeMode(RuntimeExecutionMode.BATCH); 要放在创建 table environment
之前,否则创建出来的 table environment 还是 streaming 模式。另外这个需要 Flink >= 1.14。



陈卓宇 <25...@qq.com.invalid> 于2022年1月10日周一 15:03写道:

> 代码逻辑简单描述:  我通过fromElements的方式简单构造了几条测试数据,然后将流转表,在表上
> 使用我自定义的聚合函数,进行聚合操作,最后打印
> 我的代码:
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);  //设置调度模式为批
>
> DataStreamSource<Tuple2<String, Integer&gt;&gt; source =
> env.fromElements(Tuple2.of("aa", 1),
>         Tuple2.of("aa", 2),Tuple2.of("aa", 3),Tuple2.of("bb",
> 2),Tuple2.of("bb", 3),Tuple2.of("bb", 4));
> Table table = tenv.fromDataStream(source,
>         Schema.newBuilder()
>                 .column("f0", "STRING")
>                 .column("f1", "INTEGER")
>                 .build());
> tenv.createTemporaryView("test",table);
> //对表进行sql查询
> tenv.createTemporarySystemFunction("Average", avg5.Average.class);
> tenv.executeSql("SELECT f0,Average(f1) as rbm FROM test group by
> f0").print();
>
> 结果:
> +----+--------------------------------+--------------------------------+
> | op |                             f0 |                            rbm |
> +----+--------------------------------+--------------------------------+
> | +I |                             aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | -U |                             aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +U |                             aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | -U |                             aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +U |                             aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +I |                             bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | -U |                             bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +U |                             bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | -U |                             bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +U |                             bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> +----+--------------------------------+--------------------------------+
> 我发现结果是一个可撤回流,这与我所预想批处理是不一致的
>
> 请问:我设置的batch,为什么调度模式还没有改变,我该如何解决这个问题,让他成为批处理?
>
> 最后:感谢之前几个问题上,社区同学的无私解答,因为自身邮箱出了一些问题,就不一一致谢了,望见谅。
>
> 陈卓宇
>
>
> &nbsp;