You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 陈卓宇 <25...@qq.com.INVALID> on 2022/01/11 03:31:39 UTC

回复: 批模式疑问

太感谢您无私的帮助了,我折腾了一天都不得解,底层的调度逻辑都搞明白了,就是纳闷为什么代码
出来效果不对,都怪我一叶障目了。


陈卓宇


&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <tsreaper96@gmail.com&gt;;
发送时间:&nbsp;2022年1月11日(星期二) 中午11:19
收件人:&nbsp;"flink中文邮件组"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: 批模式疑问



Hi!

env.setRuntimeMode(RuntimeExecutionMode.BATCH); 要放在创建 table environment
之前,否则创建出来的 table environment 还是 streaming 模式。另外这个需要 Flink &gt;= 1.14。



陈卓宇 <2572805166@qq.com.invalid&gt; 于2022年1月10日周一 15:03写道:

&gt; 代码逻辑简单描述:&nbsp; 我通过fromElements的方式简单构造了几条测试数据,然后将流转表,在表上
&gt; 使用我自定义的聚合函数,进行聚合操作,最后打印
&gt; 我的代码:
&gt; StreamExecutionEnvironment env =
&gt; StreamExecutionEnvironment.getExecutionEnvironment();
&gt; StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
&gt; env.setRuntimeMode(RuntimeExecutionMode.BATCH);&nbsp; //设置调度模式为批
&gt;
&gt; DataStreamSource<Tuple2<String, Integer&amp;gt;&amp;gt; source =
&gt; env.fromElements(Tuple2.of("aa", 1),
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Tuple2.of("aa", 2),Tuple2.of("aa", 3),Tuple2.of("bb",
&gt; 2),Tuple2.of("bb", 3),Tuple2.of("bb", 4));
&gt; Table table = tenv.fromDataStream(source,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Schema.newBuilder()
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .column("f0", "STRING")
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .column("f1", "INTEGER")
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .build());
&gt; tenv.createTemporaryView("test",table);
&gt; //对表进行sql查询
&gt; tenv.createTemporarySystemFunction("Average", avg5.Average.class);
&gt; tenv.executeSql("SELECT f0,Average(f1) as rbm FROM test group by
&gt; f0").print();
&gt;
&gt; 结果:
&gt; +----+--------------------------------+--------------------------------+
&gt; | op |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; f0 |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; rbm |
&gt; +----+--------------------------------+--------------------------------+
&gt; | +I |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | -U |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | +U |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | -U |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | +U |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | +I |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | -U |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | +U |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | -U |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; | +U |&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
&gt; +----+--------------------------------+--------------------------------+
&gt; 我发现结果是一个可撤回流,这与我所预想批处理是不一致的
&gt;
&gt; 请问:我设置的batch,为什么调度模式还没有改变,我该如何解决这个问题,让他成为批处理?
&gt;
&gt; 最后:感谢之前几个问题上,社区同学的无私解答,因为自身邮箱出了一些问题,就不一一致谢了,望见谅。
&gt;
&gt; 陈卓宇
&gt;
&gt;
&gt; &amp;nbsp;