You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by huang botao <bo...@gmail.com> on 2020/11/18 08:29:54 UTC

求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

Hi ,请教一个奇怪的问题:

streamSource.flatMap(new ComeIntoMaxFlatMapFunction())

    .assignTimestampsAndWatermarks(new
CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))

    .connect(ruleConfigSource)
    .process(new MetricDataFilterProcessFunction())
    .keyBy((KeySelector<Metric, MetricDataKey>) metric -> {
        MetricDataKey metricDataKey = new MetricDataKey();
        metricDataKey.setDomain(metric.getDomain());
        metricDataKey.setStationAliasCode(metric.getStaId());
        metricDataKey.setEquipMK(metric.getEquipMK());
        metricDataKey.setEquipID(metric.getEquipID());
        metricDataKey.setMetric(metric.getMetric());
        return metricDataKey;
    })

    .window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1)))
    .apply(new RichWindowFunction<Metric, MetricDataList,
MetricDataKey, TimeWindow>() {
        @Override
        public void apply(MetricDataKey tuple, TimeWindow window,
Iterable<Metric> input, Collector<MetricDataList> out) throws
Exception {
            input.forEach(x->{
                System.out.println("--->>>"+x);
            });
        }
    })

我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x);


数据一直在消费着,没有任何报错信息

Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

Posted by 赵一旦 <hi...@gmail.com>.
connect前生成watermark也是可以的应该,但是你需要把ruleConfigSource流也赋watermark。我猜是这个地方出问题了。

huang botao <bo...@gmail.com> 于2020年11月19日周四 下午12:58写道:

> hi, zhisheng, hailongwang:
>
> 感谢对这个问题的解答,这个问题确实出在了window无法触发的地方,原因是 在connect()
> 后面没有定义watermar导致,在connect后指定watermark就可以触发window了。
>
>
>
> On Wed, Nov 18, 2020 at 10:46 PM zhisheng <zh...@gmail.com> wrote:
>
> > 可以检查一下作业消费的 kafka 分区是否都有数据,如果有的分区无数据的话,那么可能会导致水印不会更新,从而窗口触发不了。
> >
> > Best
> > zhisheng
> >
> > huang botao <bo...@gmail.com> 于2020年11月18日周三 下午10:34写道:
> >
> > > 感谢您的回复,是这样的,我这边的环境设置用的是eventTime
> > >
> > > StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > >
> > > window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法
> > >
> > >
> > > On Wed, Nov 18, 2020 at 5:50 PM hailongwang <18...@163.com>
> wrote:
> > >
> > > > 应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进
> > > >
> > > > 在 2020-11-18 15:29:54,"huang botao" <bo...@gmail.com> 写道:
> > > > >Hi ,请教一个奇怪的问题:
> > > > >
> > > > >streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
> > > > >
> > > > >    .assignTimestampsAndWatermarks(new
> > > > >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
> > > > >
> > > > >    .connect(ruleConfigSource)
> > > > >    .process(new MetricDataFilterProcessFunction())
> > > > >    .keyBy((KeySelector<Metric, MetricDataKey>) metric -> {
> > > > >        MetricDataKey metricDataKey = new MetricDataKey();
> > > > >        metricDataKey.setDomain(metric.getDomain());
> > > > >        metricDataKey.setStationAliasCode(metric.getStaId());
> > > > >        metricDataKey.setEquipMK(metric.getEquipMK());
> > > > >        metricDataKey.setEquipID(metric.getEquipID());
> > > > >        metricDataKey.setMetric(metric.getMetric());
> > > > >        return metricDataKey;
> > > > >    })
> > > > >
> > > > >    .window(SlidingEventTimeWindows.of(Time.seconds(2),
> > > Time.seconds(1)))
> > > > >    .apply(new RichWindowFunction<Metric, MetricDataList,
> > > > >MetricDataKey, TimeWindow>() {
> > > > >        @Override
> > > > >        public void apply(MetricDataKey tuple, TimeWindow window,
> > > > >Iterable<Metric> input, Collector<MetricDataList> out) throws
> > > > >Exception {
> > > > >            input.forEach(x->{
> > > > >                System.out.println("--->>>"+x);
> > > > >            });
> > > > >        }
> > > > >    })
> > > > >
> > > > >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的
> System.out.println("--->>>"+x);
> > > > >
> > > > >
> > > > >数据一直在消费着,没有任何报错信息
> > > >
> > >
> >
>

Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

Posted by huang botao <bo...@gmail.com>.
hi, zhisheng, hailongwang:

感谢对这个问题的解答,这个问题确实出在了window无法触发的地方,原因是 在connect()
后面没有定义watermar导致,在connect后指定watermark就可以触发window了。



On Wed, Nov 18, 2020 at 10:46 PM zhisheng <zh...@gmail.com> wrote:

> 可以检查一下作业消费的 kafka 分区是否都有数据,如果有的分区无数据的话,那么可能会导致水印不会更新,从而窗口触发不了。
>
> Best
> zhisheng
>
> huang botao <bo...@gmail.com> 于2020年11月18日周三 下午10:34写道:
>
> > 感谢您的回复,是这样的,我这边的环境设置用的是eventTime
> >
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> > window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法
> >
> >
> > On Wed, Nov 18, 2020 at 5:50 PM hailongwang <18...@163.com> wrote:
> >
> > > 应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进
> > >
> > > 在 2020-11-18 15:29:54,"huang botao" <bo...@gmail.com> 写道:
> > > >Hi ,请教一个奇怪的问题:
> > > >
> > > >streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
> > > >
> > > >    .assignTimestampsAndWatermarks(new
> > > >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
> > > >
> > > >    .connect(ruleConfigSource)
> > > >    .process(new MetricDataFilterProcessFunction())
> > > >    .keyBy((KeySelector<Metric, MetricDataKey>) metric -> {
> > > >        MetricDataKey metricDataKey = new MetricDataKey();
> > > >        metricDataKey.setDomain(metric.getDomain());
> > > >        metricDataKey.setStationAliasCode(metric.getStaId());
> > > >        metricDataKey.setEquipMK(metric.getEquipMK());
> > > >        metricDataKey.setEquipID(metric.getEquipID());
> > > >        metricDataKey.setMetric(metric.getMetric());
> > > >        return metricDataKey;
> > > >    })
> > > >
> > > >    .window(SlidingEventTimeWindows.of(Time.seconds(2),
> > Time.seconds(1)))
> > > >    .apply(new RichWindowFunction<Metric, MetricDataList,
> > > >MetricDataKey, TimeWindow>() {
> > > >        @Override
> > > >        public void apply(MetricDataKey tuple, TimeWindow window,
> > > >Iterable<Metric> input, Collector<MetricDataList> out) throws
> > > >Exception {
> > > >            input.forEach(x->{
> > > >                System.out.println("--->>>"+x);
> > > >            });
> > > >        }
> > > >    })
> > > >
> > > >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x);
> > > >
> > > >
> > > >数据一直在消费着,没有任何报错信息
> > >
> >
>

Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

Posted by zhisheng <zh...@gmail.com>.
可以检查一下作业消费的 kafka 分区是否都有数据,如果有的分区无数据的话,那么可能会导致水印不会更新,从而窗口触发不了。

Best
zhisheng

huang botao <bo...@gmail.com> 于2020年11月18日周三 下午10:34写道:

> 感谢您的回复,是这样的,我这边的环境设置用的是eventTime
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法
>
>
> On Wed, Nov 18, 2020 at 5:50 PM hailongwang <18...@163.com> wrote:
>
> > 应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进
> >
> > 在 2020-11-18 15:29:54,"huang botao" <bo...@gmail.com> 写道:
> > >Hi ,请教一个奇怪的问题:
> > >
> > >streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
> > >
> > >    .assignTimestampsAndWatermarks(new
> > >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
> > >
> > >    .connect(ruleConfigSource)
> > >    .process(new MetricDataFilterProcessFunction())
> > >    .keyBy((KeySelector<Metric, MetricDataKey>) metric -> {
> > >        MetricDataKey metricDataKey = new MetricDataKey();
> > >        metricDataKey.setDomain(metric.getDomain());
> > >        metricDataKey.setStationAliasCode(metric.getStaId());
> > >        metricDataKey.setEquipMK(metric.getEquipMK());
> > >        metricDataKey.setEquipID(metric.getEquipID());
> > >        metricDataKey.setMetric(metric.getMetric());
> > >        return metricDataKey;
> > >    })
> > >
> > >    .window(SlidingEventTimeWindows.of(Time.seconds(2),
> Time.seconds(1)))
> > >    .apply(new RichWindowFunction<Metric, MetricDataList,
> > >MetricDataKey, TimeWindow>() {
> > >        @Override
> > >        public void apply(MetricDataKey tuple, TimeWindow window,
> > >Iterable<Metric> input, Collector<MetricDataList> out) throws
> > >Exception {
> > >            input.forEach(x->{
> > >                System.out.println("--->>>"+x);
> > >            });
> > >        }
> > >    })
> > >
> > >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x);
> > >
> > >
> > >数据一直在消费着,没有任何报错信息
> >
>

Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

Posted by huang botao <bo...@gmail.com>.
感谢您的回复,是这样的,我这边的环境设置用的是eventTime

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法


On Wed, Nov 18, 2020 at 5:50 PM hailongwang <18...@163.com> wrote:

> 应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进
>
> 在 2020-11-18 15:29:54,"huang botao" <bo...@gmail.com> 写道:
> >Hi ,请教一个奇怪的问题:
> >
> >streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
> >
> >    .assignTimestampsAndWatermarks(new
> >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
> >
> >    .connect(ruleConfigSource)
> >    .process(new MetricDataFilterProcessFunction())
> >    .keyBy((KeySelector<Metric, MetricDataKey>) metric -> {
> >        MetricDataKey metricDataKey = new MetricDataKey();
> >        metricDataKey.setDomain(metric.getDomain());
> >        metricDataKey.setStationAliasCode(metric.getStaId());
> >        metricDataKey.setEquipMK(metric.getEquipMK());
> >        metricDataKey.setEquipID(metric.getEquipID());
> >        metricDataKey.setMetric(metric.getMetric());
> >        return metricDataKey;
> >    })
> >
> >    .window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1)))
> >    .apply(new RichWindowFunction<Metric, MetricDataList,
> >MetricDataKey, TimeWindow>() {
> >        @Override
> >        public void apply(MetricDataKey tuple, TimeWindow window,
> >Iterable<Metric> input, Collector<MetricDataList> out) throws
> >Exception {
> >            input.forEach(x->{
> >                System.out.println("--->>>"+x);
> >            });
> >        }
> >    })
> >
> >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x);
> >
> >
> >数据一直在消费着,没有任何报错信息
>

Re:求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

Posted by hailongwang <18...@163.com>.
应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进

在 2020-11-18 15:29:54,"huang botao" <bo...@gmail.com> 写道:
>Hi ,请教一个奇怪的问题:
>
>streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
>
>    .assignTimestampsAndWatermarks(new
>CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
>
>    .connect(ruleConfigSource)
>    .process(new MetricDataFilterProcessFunction())
>    .keyBy((KeySelector<Metric, MetricDataKey>) metric -> {
>        MetricDataKey metricDataKey = new MetricDataKey();
>        metricDataKey.setDomain(metric.getDomain());
>        metricDataKey.setStationAliasCode(metric.getStaId());
>        metricDataKey.setEquipMK(metric.getEquipMK());
>        metricDataKey.setEquipID(metric.getEquipID());
>        metricDataKey.setMetric(metric.getMetric());
>        return metricDataKey;
>    })
>
>    .window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1)))
>    .apply(new RichWindowFunction<Metric, MetricDataList,
>MetricDataKey, TimeWindow>() {
>        @Override
>        public void apply(MetricDataKey tuple, TimeWindow window,
>Iterable<Metric> input, Collector<MetricDataList> out) throws
>Exception {
>            input.forEach(x->{
>                System.out.println("--->>>"+x);
>            });
>        }
>    })
>
>我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x);
>
>
>数据一直在消费着,没有任何报错信息