You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Xavier <xa...@gmail.com> on 2021/03/07 11:51:22 UTC
关于Watermark的使用调试问题
想问下社区,watermark必须加在数据源上吗?顺便想问下一般用什么方式来调watermark,我自己本地有发现加在map
function之后,watermark会自动重置为默认值的情况。
谢谢!
--
Best Regards,
*Xavier*
Re: 关于Watermark的使用调试问题
Posted by Xavier <xa...@gmail.com>.
是这样的,可以产生,问题从watermarksState里面拿出来的时间戳,会变成是默认值,全局除过这里有设置过,再无任何关于watermark的逻辑。
[image: image.png]
val dataLoadStream = data
.map(new EventMapFunction(config))
// Add watermark
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[EventData](Duration.ofMinutes(1))
.withTimestampAssigner(new SerializableTimestampAssigner[EventData] {
override def extractTimestamp(element: EventData, recordTimestamp: Long)
: Long = element.getEventTimestamp
})
)
On Sun, Mar 7, 2021 at 10:38 PM tison <wa...@gmail.com> wrote:
> 可以中途产生,走这个接口
>
>
> org.apache.flink.streaming.api.datastream.DataStream#assignTimestampsAndWatermarks(org.apache.flink.api.common.eventtime.WatermarkStrategy<T>)
>
> 麻烦贴一下你加 watermark 的代码和 pipeline 看一下啥情况
>
> Best,
> tison.
>
>
> Xavier <xa...@gmail.com> 于2021年3月7日周日 下午7:51写道:
>
> > 想问下社区,watermark必须加在数据源上吗?顺便想问下一般用什么方式来调watermark,我自己本地有发现加在map
> > function之后,watermark会自动重置为默认值的情况。
> > 谢谢!
> > --
> >
> > Best Regards,
> > *Xavier*
> >
>
--
Best Regards,
*Xavier*
Re: 关于Watermark的使用调试问题
Posted by tison <wa...@gmail.com>.
可以中途产生,走这个接口
org.apache.flink.streaming.api.datastream.DataStream#assignTimestampsAndWatermarks(org.apache.flink.api.common.eventtime.WatermarkStrategy<T>)
麻烦贴一下你加 watermark 的代码和 pipeline 看一下啥情况
Best,
tison.
Xavier <xa...@gmail.com> 于2021年3月7日周日 下午7:51写道:
> 想问下社区,watermark必须加在数据源上吗?顺便想问下一般用什么方式来调watermark,我自己本地有发现加在map
> function之后,watermark会自动重置为默认值的情况。
> 谢谢!
> --
>
> Best Regards,
> *Xavier*
>