You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 陈卓宇 <25...@qq.com.INVALID> on 2022/01/11 03:31:39 UTC
回复: 批模式疑问
太感谢您无私的帮助了,我折腾了一天都不得解,底层的调度逻辑都搞明白了,就是纳闷为什么代码
出来效果不对,都怪我一叶障目了。
陈卓宇
------------------ 原始邮件 ------------------
发件人: "user-zh" <tsreaper96@gmail.com>;
发送时间: 2022年1月11日(星期二) 中午11:19
收件人: "flink中文邮件组"<user-zh@flink.apache.org>;
主题: Re: 批模式疑问
Hi!
env.setRuntimeMode(RuntimeExecutionMode.BATCH); 要放在创建 table environment
之前,否则创建出来的 table environment 还是 streaming 模式。另外这个需要 Flink >= 1.14。
陈卓宇 <2572805166@qq.com.invalid> 于2022年1月10日周一 15:03写道:
> 代码逻辑简单描述: 我通过fromElements的方式简单构造了几条测试数据,然后将流转表,在表上
> 使用我自定义的聚合函数,进行聚合操作,最后打印
> 我的代码:
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> env.setRuntimeMode(RuntimeExecutionMode.BATCH); //设置调度模式为批
>
> DataStreamSource<Tuple2<String, Integer&gt;&gt; source =
> env.fromElements(Tuple2.of("aa", 1),
> Tuple2.of("aa", 2),Tuple2.of("aa", 3),Tuple2.of("bb",
> 2),Tuple2.of("bb", 3),Tuple2.of("bb", 4));
> Table table = tenv.fromDataStream(source,
> Schema.newBuilder()
> .column("f0", "STRING")
> .column("f1", "INTEGER")
> .build());
> tenv.createTemporaryView("test",table);
> //对表进行sql查询
> tenv.createTemporarySystemFunction("Average", avg5.Average.class);
> tenv.executeSql("SELECT f0,Average(f1) as rbm FROM test group by
> f0").print();
>
> 结果:
> +----+--------------------------------+--------------------------------+
> | op | f0 | rbm |
> +----+--------------------------------+--------------------------------+
> | +I | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | -U | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +U | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | -U | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +U | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +I | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | -U | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +U | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | -U | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +U | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> +----+--------------------------------+--------------------------------+
> 我发现结果是一个可撤回流,这与我所预想批处理是不一致的
>
> 请问:我设置的batch,为什么调度模式还没有改变,我该如何解决这个问题,让他成为批处理?
>
> 最后:感谢之前几个问题上,社区同学的无私解答,因为自身邮箱出了一些问题,就不一一致谢了,望见谅。
>
> 陈卓宇
>
>
> &nbsp;