You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by huang botao <bo...@gmail.com> on 2020/11/18 08:29:54 UTC
求助:Flink DataStream 的 windowoperator 后面apply 方法不执行
Hi ,请教一个奇怪的问题:
streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
.assignTimestampsAndWatermarks(new
CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
.connect(ruleConfigSource)
.process(new MetricDataFilterProcessFunction())
.keyBy((KeySelector<Metric, MetricDataKey>) metric -> {
MetricDataKey metricDataKey = new MetricDataKey();
metricDataKey.setDomain(metric.getDomain());
metricDataKey.setStationAliasCode(metric.getStaId());
metricDataKey.setEquipMK(metric.getEquipMK());
metricDataKey.setEquipID(metric.getEquipID());
metricDataKey.setMetric(metric.getMetric());
return metricDataKey;
})
.window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1)))
.apply(new RichWindowFunction<Metric, MetricDataList,
MetricDataKey, TimeWindow>() {
@Override
public void apply(MetricDataKey tuple, TimeWindow window,
Iterable<Metric> input, Collector<MetricDataList> out) throws
Exception {
input.forEach(x->{
System.out.println("--->>>"+x);
});
}
})
我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x);
数据一直在消费着,没有任何报错信息
Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行
Posted by 赵一旦 <hi...@gmail.com>.
connect前生成watermark也是可以的应该,但是你需要把ruleConfigSource流也赋watermark。我猜是这个地方出问题了。
huang botao <bo...@gmail.com> 于2020年11月19日周四 下午12:58写道:
> hi, zhisheng, hailongwang:
>
> 感谢对这个问题的解答,这个问题确实出在了window无法触发的地方,原因是 在connect()
> 后面没有定义watermar导致,在connect后指定watermark就可以触发window了。
>
>
>
> On Wed, Nov 18, 2020 at 10:46 PM zhisheng <zh...@gmail.com> wrote:
>
> > 可以检查一下作业消费的 kafka 分区是否都有数据,如果有的分区无数据的话,那么可能会导致水印不会更新,从而窗口触发不了。
> >
> > Best
> > zhisheng
> >
> > huang botao <bo...@gmail.com> 于2020年11月18日周三 下午10:34写道:
> >
> > > 感谢您的回复,是这样的,我这边的环境设置用的是eventTime
> > >
> > > StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > >
> > > window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法
> > >
> > >
> > > On Wed, Nov 18, 2020 at 5:50 PM hailongwang <18...@163.com>
> wrote:
> > >
> > > > 应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进
> > > >
> > > > 在 2020-11-18 15:29:54,"huang botao" <bo...@gmail.com> 写道:
> > > > >Hi ,请教一个奇怪的问题:
> > > > >
> > > > >streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
> > > > >
> > > > > .assignTimestampsAndWatermarks(new
> > > > >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
> > > > >
> > > > > .connect(ruleConfigSource)
> > > > > .process(new MetricDataFilterProcessFunction())
> > > > > .keyBy((KeySelector<Metric, MetricDataKey>) metric -> {
> > > > > MetricDataKey metricDataKey = new MetricDataKey();
> > > > > metricDataKey.setDomain(metric.getDomain());
> > > > > metricDataKey.setStationAliasCode(metric.getStaId());
> > > > > metricDataKey.setEquipMK(metric.getEquipMK());
> > > > > metricDataKey.setEquipID(metric.getEquipID());
> > > > > metricDataKey.setMetric(metric.getMetric());
> > > > > return metricDataKey;
> > > > > })
> > > > >
> > > > > .window(SlidingEventTimeWindows.of(Time.seconds(2),
> > > Time.seconds(1)))
> > > > > .apply(new RichWindowFunction<Metric, MetricDataList,
> > > > >MetricDataKey, TimeWindow>() {
> > > > > @Override
> > > > > public void apply(MetricDataKey tuple, TimeWindow window,
> > > > >Iterable<Metric> input, Collector<MetricDataList> out) throws
> > > > >Exception {
> > > > > input.forEach(x->{
> > > > > System.out.println("--->>>"+x);
> > > > > });
> > > > > }
> > > > > })
> > > > >
> > > > >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的
> System.out.println("--->>>"+x);
> > > > >
> > > > >
> > > > >数据一直在消费着,没有任何报错信息
> > > >
> > >
> >
>
Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行
Posted by huang botao <bo...@gmail.com>.
hi, zhisheng, hailongwang:
感谢对这个问题的解答,这个问题确实出在了window无法触发的地方,原因是 在connect()
后面没有定义watermar导致,在connect后指定watermark就可以触发window了。
On Wed, Nov 18, 2020 at 10:46 PM zhisheng <zh...@gmail.com> wrote:
> 可以检查一下作业消费的 kafka 分区是否都有数据,如果有的分区无数据的话,那么可能会导致水印不会更新,从而窗口触发不了。
>
> Best
> zhisheng
>
> huang botao <bo...@gmail.com> 于2020年11月18日周三 下午10:34写道:
>
> > 感谢您的回复,是这样的,我这边的环境设置用的是eventTime
> >
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> > window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法
> >
> >
> > On Wed, Nov 18, 2020 at 5:50 PM hailongwang <18...@163.com> wrote:
> >
> > > 应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进
> > >
> > > 在 2020-11-18 15:29:54,"huang botao" <bo...@gmail.com> 写道:
> > > >Hi ,请教一个奇怪的问题:
> > > >
> > > >streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
> > > >
> > > > .assignTimestampsAndWatermarks(new
> > > >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
> > > >
> > > > .connect(ruleConfigSource)
> > > > .process(new MetricDataFilterProcessFunction())
> > > > .keyBy((KeySelector<Metric, MetricDataKey>) metric -> {
> > > > MetricDataKey metricDataKey = new MetricDataKey();
> > > > metricDataKey.setDomain(metric.getDomain());
> > > > metricDataKey.setStationAliasCode(metric.getStaId());
> > > > metricDataKey.setEquipMK(metric.getEquipMK());
> > > > metricDataKey.setEquipID(metric.getEquipID());
> > > > metricDataKey.setMetric(metric.getMetric());
> > > > return metricDataKey;
> > > > })
> > > >
> > > > .window(SlidingEventTimeWindows.of(Time.seconds(2),
> > Time.seconds(1)))
> > > > .apply(new RichWindowFunction<Metric, MetricDataList,
> > > >MetricDataKey, TimeWindow>() {
> > > > @Override
> > > > public void apply(MetricDataKey tuple, TimeWindow window,
> > > >Iterable<Metric> input, Collector<MetricDataList> out) throws
> > > >Exception {
> > > > input.forEach(x->{
> > > > System.out.println("--->>>"+x);
> > > > });
> > > > }
> > > > })
> > > >
> > > >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x);
> > > >
> > > >
> > > >数据一直在消费着,没有任何报错信息
> > >
> >
>
Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行
Posted by zhisheng <zh...@gmail.com>.
可以检查一下作业消费的 kafka 分区是否都有数据,如果有的分区无数据的话,那么可能会导致水印不会更新,从而窗口触发不了。
Best
zhisheng
huang botao <bo...@gmail.com> 于2020年11月18日周三 下午10:34写道:
> 感谢您的回复,是这样的,我这边的环境设置用的是eventTime
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法
>
>
> On Wed, Nov 18, 2020 at 5:50 PM hailongwang <18...@163.com> wrote:
>
> > 应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进
> >
> > 在 2020-11-18 15:29:54,"huang botao" <bo...@gmail.com> 写道:
> > >Hi ,请教一个奇怪的问题:
> > >
> > >streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
> > >
> > > .assignTimestampsAndWatermarks(new
> > >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
> > >
> > > .connect(ruleConfigSource)
> > > .process(new MetricDataFilterProcessFunction())
> > > .keyBy((KeySelector<Metric, MetricDataKey>) metric -> {
> > > MetricDataKey metricDataKey = new MetricDataKey();
> > > metricDataKey.setDomain(metric.getDomain());
> > > metricDataKey.setStationAliasCode(metric.getStaId());
> > > metricDataKey.setEquipMK(metric.getEquipMK());
> > > metricDataKey.setEquipID(metric.getEquipID());
> > > metricDataKey.setMetric(metric.getMetric());
> > > return metricDataKey;
> > > })
> > >
> > > .window(SlidingEventTimeWindows.of(Time.seconds(2),
> Time.seconds(1)))
> > > .apply(new RichWindowFunction<Metric, MetricDataList,
> > >MetricDataKey, TimeWindow>() {
> > > @Override
> > > public void apply(MetricDataKey tuple, TimeWindow window,
> > >Iterable<Metric> input, Collector<MetricDataList> out) throws
> > >Exception {
> > > input.forEach(x->{
> > > System.out.println("--->>>"+x);
> > > });
> > > }
> > > })
> > >
> > >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x);
> > >
> > >
> > >数据一直在消费着,没有任何报错信息
> >
>
Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行
Posted by huang botao <bo...@gmail.com>.
感谢您的回复,是这样的,我这边的环境设置用的是eventTime
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法
On Wed, Nov 18, 2020 at 5:50 PM hailongwang <18...@163.com> wrote:
> 应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进
>
> 在 2020-11-18 15:29:54,"huang botao" <bo...@gmail.com> 写道:
> >Hi ,请教一个奇怪的问题:
> >
> >streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
> >
> > .assignTimestampsAndWatermarks(new
> >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
> >
> > .connect(ruleConfigSource)
> > .process(new MetricDataFilterProcessFunction())
> > .keyBy((KeySelector<Metric, MetricDataKey>) metric -> {
> > MetricDataKey metricDataKey = new MetricDataKey();
> > metricDataKey.setDomain(metric.getDomain());
> > metricDataKey.setStationAliasCode(metric.getStaId());
> > metricDataKey.setEquipMK(metric.getEquipMK());
> > metricDataKey.setEquipID(metric.getEquipID());
> > metricDataKey.setMetric(metric.getMetric());
> > return metricDataKey;
> > })
> >
> > .window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1)))
> > .apply(new RichWindowFunction<Metric, MetricDataList,
> >MetricDataKey, TimeWindow>() {
> > @Override
> > public void apply(MetricDataKey tuple, TimeWindow window,
> >Iterable<Metric> input, Collector<MetricDataList> out) throws
> >Exception {
> > input.forEach(x->{
> > System.out.println("--->>>"+x);
> > });
> > }
> > })
> >
> >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x);
> >
> >
> >数据一直在消费着,没有任何报错信息
>
Re:求助:Flink DataStream 的 windowoperator 后面apply 方法不执行
Posted by hailongwang <18...@163.com>.
应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进
在 2020-11-18 15:29:54,"huang botao" <bo...@gmail.com> 写道:
>Hi ,请教一个奇怪的问题:
>
>streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
>
> .assignTimestampsAndWatermarks(new
>CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
>
> .connect(ruleConfigSource)
> .process(new MetricDataFilterProcessFunction())
> .keyBy((KeySelector<Metric, MetricDataKey>) metric -> {
> MetricDataKey metricDataKey = new MetricDataKey();
> metricDataKey.setDomain(metric.getDomain());
> metricDataKey.setStationAliasCode(metric.getStaId());
> metricDataKey.setEquipMK(metric.getEquipMK());
> metricDataKey.setEquipID(metric.getEquipID());
> metricDataKey.setMetric(metric.getMetric());
> return metricDataKey;
> })
>
> .window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1)))
> .apply(new RichWindowFunction<Metric, MetricDataList,
>MetricDataKey, TimeWindow>() {
> @Override
> public void apply(MetricDataKey tuple, TimeWindow window,
>Iterable<Metric> input, Collector<MetricDataList> out) throws
>Exception {
> input.forEach(x->{
> System.out.println("--->>>"+x);
> });
> }
> })
>
>我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x);
>
>
>数据一直在消费着,没有任何报错信息