You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 刘建刚 <li...@gmail.com> on 2020/04/15 07:32:40 UTC
Re: flinksql如何控制结果输出的频率
我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码:
public class EarlyEmitter {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlink
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
tEnv.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true);
tEnv.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", "1000 ms");
Table table = tEnv.fromDataStream(
env.addSource(new SourceData()), "generate_time, name, city, id, event_time.proctime");
tEnv.createTemporaryView("person", table);
String emit =
"SELECT name, COUNT(DISTINCT id)" +
"FROM person " +
"GROUP BY TUMBLE(event_time, interval '10' second), name";
Table result = tEnv.sqlQuery(emit);
tEnv.toRetractStream(result, Row.class).print();
env.execute("IncrementalGrouping");
}
private static final class SourceData implements SourceFunction<Tuple4<Long, String, String, Long>> {
@Override
public void run(SourceContext<Tuple4<Long, String, String, Long>> ctx) throws Exception {
while (true) {
long time = System.currentTimeMillis();
ctx.collect(Tuple4.of(time, "flink", "bj", 1L));
}
}
@Override
public void cancel() {
}
}
}
> 2020年3月27日 下午3:23,Benchao Li <li...@gmail.com> 写道:
>
> Hi,
>
> 对于第二个场景,可以尝试一下fast emit:
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 5min
>
> PS:
> 1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature
> 2. window加了emit之后,会由原来输出append结果变成输出retract结果
>
> Jingsong Li <ji...@gmail.com> 于2020年3月27日周五 下午2:51写道:
>
>> Hi,
>>
>> For #1:
>> 创建级联的两级window:
>> - 1分钟窗口
>> - 5分钟窗口,计算只是保存数据,发送明细数据结果
>>
>> Best,
>> Jingsong Lee
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
Re: flinksql如何控制结果输出的频率
Posted by Benchao Li <li...@gmail.com>.
非常开心能够帮助到你~
刘建刚 <li...@gmail.com> 于2020年4月15日周三 下午3:57写道:
> 感谢 Benchao,问题应解决了!
>
> 2020年4月15日 下午3:38,Benchao Li <li...@gmail.com> 写道:
>
> Hi 建刚,
>
> 现在Emit的原理是这样子的:
> - *当某个key*下面来了第一条数据的时候,注册一个emit delay之后的*处理时间定时器*;
> - 当定时器到了的时候,
> - 检查当前的key下的聚合结果跟上次输出的结果是否有变化,
> - 如果有变化,就发送-[old], +[new] 两条结果到下游;
> - 如果是*没有变化,则不做任何处理*;
> - 再次注册一个新的emit delay之后的处理时间定时器。
>
> 你可以根据这个原理,再对照下你的数据,看看是否符合预期。
>
> 刘建刚 <li...@gmail.com> 于2020年4月15日周三 下午3:32写道:
>
>>
>> 我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码:
>>
>> public class EarlyEmitter {
>> public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setParallelism(1);
>>
>> EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlink
>>
>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
>> settings);
>>
>> tEnv.getConfig().getConfiguration().setBoolean(
>> "table.exec.emit.early-fire.enabled", true);
>> tEnv.getConfig().getConfiguration().setString(
>> "table.exec.emit.early-fire.delay", "1000 ms");
>>
>> Table table = tEnv.fromDataStream(
>> env.addSource(new SourceData()), "generate_time, name, city, id,
>> event_time.proctime");
>> tEnv.createTemporaryView("person", table);
>>
>> String emit =
>> "SELECT name, COUNT(DISTINCT id)" +
>> "FROM person " +
>> "GROUP BY TUMBLE(event_time, interval '10' second), name";
>>
>> Table result = tEnv.sqlQuery(emit);
>> tEnv.toRetractStream(result, Row.class).print();
>>
>> env.execute("IncrementalGrouping");
>> }
>>
>> private static final class SourceData implements
>> SourceFunction<Tuple4<Long, String, String, Long>> {
>> @Override
>> public void run(SourceContext<Tuple4<Long, String, String, Long>> ctx) throws
>> Exception {
>> while (true) {
>> long time = System.currentTimeMillis();
>> ctx.collect(Tuple4.of(time, "flink", "bj", 1L));
>> }
>> }
>>
>> @Override
>> public void cancel() {
>>
>> }
>> }
>> }
>>
>>
>>
>>
>>
>> 2020年3月27日 下午3:23,Benchao Li <li...@gmail.com> 写道:
>>
>> Hi,
>>
>> 对于第二个场景,可以尝试一下fast emit:
>> table.exec.emit.early-fire.enabled = true
>> table.exec.emit.early-fire.delay = 5min
>>
>> PS:
>> 1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature
>> 2. window加了emit之后,会由原来输出append结果变成输出retract结果
>>
>> Jingsong Li <ji...@gmail.com> 于2020年3月27日周五 下午2:51写道:
>>
>> Hi,
>>
>> For #1:
>> 创建级联的两级window:
>> - 1分钟窗口
>> - 5分钟窗口,计算只是保存数据,发送明细数据结果
>>
>> Best,
>> Jingsong Lee
>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenchao@gmail.com <li...@gmail.com>; libenchao@pku.edu.cn
>>
>>
>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>
>
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn
Re: flinksql如何控制结果输出的频率
Posted by Benchao Li <li...@apache.org>.
可以具体描述下你的问题么,没太看懂你的问题。
smallwong <sm...@163.com> 于2020年10月14日周三 下午6:57写道:
> 哈喽,请问是做了什么调整?才10秒的窗口,期待每秒都输出结果的
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
--
Best,
Benchao Li
Re: flinksql如何控制结果输出的频率
Posted by smallwong <sm...@163.com>.
哈喽,请问是做了什么调整?才10秒的窗口,期待每秒都输出结果的
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flinksql如何控制结果输出的频率
Posted by 刘建刚 <li...@gmail.com>.
感谢 Benchao,问题应解决了!
> 2020年4月15日 下午3:38,Benchao Li <li...@gmail.com> 写道:
>
> Hi 建刚,
>
> 现在Emit的原理是这样子的:
> - 当某个key下面来了第一条数据的时候,注册一个emit delay之后的处理时间定时器;
> - 当定时器到了的时候,
> - 检查当前的key下的聚合结果跟上次输出的结果是否有变化,
> - 如果有变化,就发送-[old], +[new] 两条结果到下游;
> - 如果是没有变化,则不做任何处理;
> - 再次注册一个新的emit delay之后的处理时间定时器。
>
> 你可以根据这个原理,再对照下你的数据,看看是否符合预期。
>
> 刘建刚 <liujiangangpeng@gmail.com <ma...@gmail.com>> 于2020年4月15日周三 下午3:32写道:
> 我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码:
>
> public class EarlyEmitter {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>
> EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlink
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>
> tEnv.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true);
> tEnv.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", "1000 ms");
>
> Table table = tEnv.fromDataStream(
> env.addSource(new SourceData()), "generate_time, name, city, id, event_time.proctime");
> tEnv.createTemporaryView("person", table);
>
> String emit =
> "SELECT name, COUNT(DISTINCT id)" +
> "FROM person " +
> "GROUP BY TUMBLE(event_time, interval '10' second), name";
>
> Table result = tEnv.sqlQuery(emit);
> tEnv.toRetractStream(result, Row.class).print();
>
> env.execute("IncrementalGrouping");
> }
>
> private static final class SourceData implements SourceFunction<Tuple4<Long, String, String, Long>> {
> @Override
> public void run(SourceContext<Tuple4<Long, String, String, Long>> ctx) throws Exception {
> while (true) {
> long time = System.currentTimeMillis();
> ctx.collect(Tuple4.of(time, "flink", "bj", 1L));
> }
> }
>
> @Override
> public void cancel() {
>
> }
> }
> }
>
>
>
>
>> 2020年3月27日 下午3:23,Benchao Li <libenchao@gmail.com <ma...@gmail.com>> 写道:
>>
>> Hi,
>>
>> 对于第二个场景,可以尝试一下fast emit:
>> table.exec.emit.early-fire.enabled = true
>> table.exec.emit.early-fire.delay = 5min
>>
>> PS:
>> 1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature
>> 2. window加了emit之后,会由原来输出append结果变成输出retract结果
>>
>> Jingsong Li <jingsonglee0@gmail.com <ma...@gmail.com>> 于2020年3月27日周五 下午2:51写道:
>>
>>> Hi,
>>>
>>> For #1:
>>> 创建级联的两级window:
>>> - 1分钟窗口
>>> - 5分钟窗口,计算只是保存数据,发送明细数据结果
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenchao@gmail.com <ma...@gmail.com>; libenchao@pku.edu.cn <ma...@pku.edu.cn>
>
>
>
> --
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com <ma...@gmail.com>; libenchao@pku.edu.cn <ma...@pku.edu.cn>
Re: flinksql如何控制结果输出的频率
Posted by Benchao Li <li...@gmail.com>.
Hi 建刚,
现在Emit的原理是这样子的:
- *当某个key*下面来了第一条数据的时候,注册一个emit delay之后的*处理时间定时器*;
- 当定时器到了的时候,
- 检查当前的key下的聚合结果跟上次输出的结果是否有变化,
- 如果有变化,就发送-[old], +[new] 两条结果到下游;
- 如果是*没有变化,则不做任何处理*;
- 再次注册一个新的emit delay之后的处理时间定时器。
你可以根据这个原理,再对照下你的数据,看看是否符合预期。
刘建刚 <li...@gmail.com> 于2020年4月15日周三 下午3:32写道:
>
> 我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码:
>
> public class EarlyEmitter {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>
> EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlink
>
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings)
> ;
>
> tEnv.getConfig().getConfiguration().setBoolean(
> "table.exec.emit.early-fire.enabled", true);
> tEnv.getConfig().getConfiguration().setString(
> "table.exec.emit.early-fire.delay", "1000 ms");
>
> Table table = tEnv.fromDataStream(
> env.addSource(new SourceData()), "generate_time, name, city, id,
> event_time.proctime");
> tEnv.createTemporaryView("person", table);
>
> String emit =
> "SELECT name, COUNT(DISTINCT id)" +
> "FROM person " +
> "GROUP BY TUMBLE(event_time, interval '10' second), name";
>
> Table result = tEnv.sqlQuery(emit);
> tEnv.toRetractStream(result, Row.class).print();
>
> env.execute("IncrementalGrouping");
> }
>
> private static final class SourceData implements
> SourceFunction<Tuple4<Long, String, String, Long>> {
> @Override
> public void run(SourceContext<Tuple4<Long, String, String, Long>> ctx) throws
> Exception {
> while (true) {
> long time = System.currentTimeMillis();
> ctx.collect(Tuple4.of(time, "flink", "bj", 1L));
> }
> }
>
> @Override
> public void cancel() {
>
> }
> }
> }
>
>
>
>
>
> 2020年3月27日 下午3:23,Benchao Li <li...@gmail.com> 写道:
>
> Hi,
>
> 对于第二个场景,可以尝试一下fast emit:
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 5min
>
> PS:
> 1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature
> 2. window加了emit之后,会由原来输出append结果变成输出retract结果
>
> Jingsong Li <ji...@gmail.com> 于2020年3月27日周五 下午2:51写道:
>
> Hi,
>
> For #1:
> 创建级联的两级window:
> - 1分钟窗口
> - 5分钟窗口,计算只是保存数据,发送明细数据结果
>
> Best,
> Jingsong Lee
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com <li...@gmail.com>; libenchao@pku.edu.cn
>
>
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn