You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Xingcan Cui (JIRA)" <ji...@apache.org> on 2017/08/29 06:53:00 UTC

[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

    [ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144836#comment-16144836 ] 

Xingcan Cui commented on FLINK-7548:
------------------------------------

Thanks for the issue [~jark]. I'd like to share some ideas about the problem.
1. In principle, each rowtime field should be "guarded" with a set of watermarks. Although we support multiple rowtime fields now, once two streams are connected, their watermarks will be forcibly merged. As a consequence, the initial watermarks may not be used in the following calculations. Shall we consider re-generating them?
2. The current periodic watermark assigner is based on machine time. I'm not sure if it is applicable for rowtime since the rowtime and machine time may not be synchronized. For example, if the stream is sourced from a historical queue, it may feed into the system at a maximum speed, thus the machine time based watermark assigner may not work properly (e.g., we may generate a watermark with 1 hour rowtime span in 5 seconds). How about using a rowtime based periodic assigner with the following framework?
{code:java}
class WatermarksAssigner(interval: Long) extends AssignerWithPunctuatedWatermarks[Order] {
  var currentWatermark: Long = 0

  override def extractTimestamp(element: Order, previousElementTimestamp: Long): Long = {
    element.rt
  }

  override def checkAndGetNextWatermark(lastElement: Order, extractedTimestamp: Long): Watermark = {
    if (currentWatermark >= lastWatermark + interval) {
      currentWatermark = currentWatermark + ((extractedTimestamp - lastWatermarks) / interval) * 
        interval
      new Watermark(currentWatermark)
    } else {
      null
    }
  }
}
{code}
BTW, I'm quite interested in this issue. Can I take it?

Thanks, Xingcan



> Support watermark generation for TableSource
> --------------------------------------------
>
>                 Key: FLINK-7548
>                 URL: https://issues.apache.org/jira/browse/FLINK-7548
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Jark Wu
>
> As discussed in FLINK-7446, currently the TableSource only support to define rowtime field, but not support to extract watermarks from the rowtime field. We can provide a new interface called {{DefinedWatermark}}, which has two methods {{getRowtimeAttribute}} (can only be an existing field) and {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked deprecated.
> How to support periodic and punctuated watermarks and support some built-in strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)