You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Xavier <xa...@gmail.com> on 2021/03/07 11:51:22 UTC

关于Watermark的使用调试问题

   想问下社区,watermark必须加在数据源上吗?顺便想问下一般用什么方式来调watermark,我自己本地有发现加在map
function之后,watermark会自动重置为默认值的情况。
    谢谢!
-- 

Best Regards,
*Xavier*

Re: 关于Watermark的使用调试问题

Posted by Xavier <xa...@gmail.com>.
是这样的,可以产生,问题从watermarksState里面拿出来的时间戳,会变成是默认值,全局除过这里有设置过,再无任何关于watermark的逻辑。
[image: image.png]

val dataLoadStream = data
  .map(new EventMapFunction(config))
  // Add watermark
  .assignTimestampsAndWatermarks(
    WatermarkStrategy
      .forBoundedOutOfOrderness[EventData](Duration.ofMinutes(1))
      .withTimestampAssigner(new SerializableTimestampAssigner[EventData] {
        override def extractTimestamp(element: EventData, recordTimestamp: Long)
        : Long = element.getEventTimestamp
      })
  )


On Sun, Mar 7, 2021 at 10:38 PM tison <wa...@gmail.com> wrote:

> 可以中途产生,走这个接口
>
>
> org.apache.flink.streaming.api.datastream.DataStream#assignTimestampsAndWatermarks(org.apache.flink.api.common.eventtime.WatermarkStrategy<T>)
>
> 麻烦贴一下你加 watermark 的代码和 pipeline 看一下啥情况
>
> Best,
> tison.
>
>
> Xavier <xa...@gmail.com> 于2021年3月7日周日 下午7:51写道:
>
> >    想问下社区,watermark必须加在数据源上吗?顺便想问下一般用什么方式来调watermark,我自己本地有发现加在map
> > function之后,watermark会自动重置为默认值的情况。
> >     谢谢!
> > --
> >
> > Best Regards,
> > *Xavier*
> >
>


-- 

Best Regards,
*Xavier*

Re: 关于Watermark的使用调试问题

Posted by tison <wa...@gmail.com>.
可以中途产生,走这个接口

org.apache.flink.streaming.api.datastream.DataStream#assignTimestampsAndWatermarks(org.apache.flink.api.common.eventtime.WatermarkStrategy<T>)

麻烦贴一下你加 watermark 的代码和 pipeline 看一下啥情况

Best,
tison.


Xavier <xa...@gmail.com> 于2021年3月7日周日 下午7:51写道:

>    想问下社区,watermark必须加在数据源上吗?顺便想问下一般用什么方式来调watermark,我自己本地有发现加在map
> function之后,watermark会自动重置为默认值的情况。
>     谢谢!
> --
>
> Best Regards,
> *Xavier*
>