You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by venn <wx...@163.com> on 2019/09/03 06:39:00 UTC

Flink 周期性创建watermark,200ms的周期是怎么控制的

各位大佬, 今天看flink 指派Timestamp 和watermark 的源码,发现周期性创建
watermark 确实是周期性的,从打印到控制台的时间可以看到差不多是200毫秒执行一
次, 200毫秒是在哪里控制的,在debug 的调用栈找不到(源码位置)?

 

周期新创建watermark  方法如下:

            .assignAscendingTimestamps(element =>
sdf.parse(element.createTime).getTime)

            .assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor[Event](Time.milliseconds(50))

 

 

生成Timestamp的方法:

TimestampsAndPeriodicWatermarksOperator 类的 :

 


@Override
public void processElement(StreamRecord<T> element) throws Exception {
   final long newTimestamp =
userFunction.extractTimestamp(element.getValue(),
         element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

   output.collect(element.replace(element.getValue(), newTimestamp));
}

 

 

生成watermark的方法:

TimestampsAndPeriodicWatermarksOperator 类的 :


@Override
public void onProcessingTime(long timestamp) throws Exception {
   // 从这里可以看到,每200ms 打印一次
   System.out.println("timestamp : " + timestamp + ", system.current : " +
System.currentTimeMillis());
   // register next timer
   Watermark newWatermark = userFunction.getCurrentWatermark();
   if (newWatermark != null && newWatermark.getTimestamp() >
currentWatermark) {
      currentWatermark = newWatermark.getTimestamp();
      // emit watermark
      output.emitWatermark(newWatermark);
   }

   long now = getProcessingTimeService().getCurrentProcessingTime();
   getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}

 

 

 

感谢各位大佬


回复: Flink 周期性创建watermark,200ms的周期是怎么控制的

Posted by "Yuan,Youjun" <yu...@baidu.com>.
源码参考:PeriodicWatermarkEmitter


-----邮件原件-----
发件人: Dino Zhang <vi...@gmail.com> 
发送时间: Tuesday, September 3, 2019 3:14 PM
收件人: user-zh@flink.apache.org
主题: Re: Flink 周期性创建watermark,200ms的周期是怎么控制的

hi venn,


基于EventTIme的Watermark间隔默认200ms,可以通过ExecutionConfig的setAutoWatermarkInterval方法进行设置,见StreamExecutionEnvironment:

  /**
   * Sets the time characteristic for all streams create from this environment, e.g., processing
   * time, event time, or ingestion time.
   *
   * <p>If you set the characteristic to IngestionTime of EventTime this will set a default
   * watermark update interval of 200 ms. If this is not applicable for your application
   * you should change it using {@link
ExecutionConfig#setAutoWatermarkInterval(long)}.
   *
   * @param characteristic The time characteristic.
   */
  @PublicEvolving
  public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
    this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
    if (characteristic == TimeCharacteristic.ProcessingTime) {
      getConfig().setAutoWatermarkInterval(0);
    } else {
      getConfig().setAutoWatermarkInterval(200);
    }
  }



On Tue, Sep 3, 2019 at 2:39 PM venn <wx...@163.com> wrote:

> 各位大佬, 今天看flink 指派Timestamp 和watermark 的源码,发现周期性创建
> watermark 确实是周期性的,从打印到控制台的时间可以看到差不多是200毫秒执行一
> 次, 200毫秒是在哪里控制的,在debug 的调用栈找不到(源码位置)?
>
>
>
> 周期新创建watermark  方法如下:
>
>             .assignAscendingTimestamps(element =>
> sdf.parse(element.createTime).getTime)
>
>             .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor[Event](Time.milliseconds(50))
>
>
>
>
>
> 生成Timestamp的方法:
>
> TimestampsAndPeriodicWatermarksOperator 类的 :
>
>
>
>
> @Override
> public void processElement(StreamRecord<T> element) throws Exception {
>    final long newTimestamp =
> userFunction.extractTimestamp(element.getValue(),
>          element.hasTimestamp() ? element.getTimestamp() : 
> Long.MIN_VALUE);
>
>    output.collect(element.replace(element.getValue(), newTimestamp)); 
> }
>
>
>
>
>
> 生成watermark的方法:
>
> TimestampsAndPeriodicWatermarksOperator 类的 :
>
>
> @Override
> public void onProcessingTime(long timestamp) throws Exception {
>    // 从这里可以看到,每200ms 打印一次
>    System.out.println("timestamp : " + timestamp + ", system.current : 
> " + System.currentTimeMillis());
>    // register next timer
>    Watermark newWatermark = userFunction.getCurrentWatermark();
>    if (newWatermark != null && newWatermark.getTimestamp() >
> currentWatermark) {
>       currentWatermark = newWatermark.getTimestamp();
>       // emit watermark
>       output.emitWatermark(newWatermark);
>    }
>
>    long now = getProcessingTimeService().getCurrentProcessingTime();
>    getProcessingTimeService().registerTimer(now + watermarkInterval, 
> this); }
>
>
>
>
>
>
>
> 感谢各位大佬
>
>

Re: Flink 周期性创建watermark,200ms的周期是怎么控制的

Posted by Dino Zhang <vi...@gmail.com>.
hi venn,


基于EventTIme的Watermark间隔默认200ms,可以通过ExecutionConfig的setAutoWatermarkInterval方法进行设置,见StreamExecutionEnvironment:

  /**
   * Sets the time characteristic for all streams create from this
environment, e.g., processing
   * time, event time, or ingestion time.
   *
   * <p>If you set the characteristic to IngestionTime of EventTime
this will set a default
   * watermark update interval of 200 ms. If this is not applicable
for your application
   * you should change it using {@link
ExecutionConfig#setAutoWatermarkInterval(long)}.
   *
   * @param characteristic The time characteristic.
   */
  @PublicEvolving
  public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
    this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
    if (characteristic == TimeCharacteristic.ProcessingTime) {
      getConfig().setAutoWatermarkInterval(0);
    } else {
      getConfig().setAutoWatermarkInterval(200);
    }
  }



On Tue, Sep 3, 2019 at 2:39 PM venn <wx...@163.com> wrote:

> 各位大佬, 今天看flink 指派Timestamp 和watermark 的源码,发现周期性创建
> watermark 确实是周期性的,从打印到控制台的时间可以看到差不多是200毫秒执行一
> 次, 200毫秒是在哪里控制的,在debug 的调用栈找不到(源码位置)?
>
>
>
> 周期新创建watermark  方法如下:
>
>             .assignAscendingTimestamps(element =>
> sdf.parse(element.createTime).getTime)
>
>             .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor[Event](Time.milliseconds(50))
>
>
>
>
>
> 生成Timestamp的方法:
>
> TimestampsAndPeriodicWatermarksOperator 类的 :
>
>
>
>
> @Override
> public void processElement(StreamRecord<T> element) throws Exception {
>    final long newTimestamp =
> userFunction.extractTimestamp(element.getValue(),
>          element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
>
>    output.collect(element.replace(element.getValue(), newTimestamp));
> }
>
>
>
>
>
> 生成watermark的方法:
>
> TimestampsAndPeriodicWatermarksOperator 类的 :
>
>
> @Override
> public void onProcessingTime(long timestamp) throws Exception {
>    // 从这里可以看到,每200ms 打印一次
>    System.out.println("timestamp : " + timestamp + ", system.current : " +
> System.currentTimeMillis());
>    // register next timer
>    Watermark newWatermark = userFunction.getCurrentWatermark();
>    if (newWatermark != null && newWatermark.getTimestamp() >
> currentWatermark) {
>       currentWatermark = newWatermark.getTimestamp();
>       // emit watermark
>       output.emitWatermark(newWatermark);
>    }
>
>    long now = getProcessingTimeService().getCurrentProcessingTime();
>    getProcessingTimeService().registerTimer(now + watermarkInterval, this);
> }
>
>
>
>
>
>
>
> 感谢各位大佬
>
>