You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jungtaek Lim <ka...@gmail.com> on 2018/07/04 08:15:16 UTC

[Table API/SQL] Finding missing spots to resolve why 'no watermark' is presented in Flink UI

Hi Flink users,

I'm new to Flink and trying to evaluate couple of streaming frameworks via
implementing same apps.

While implementing apps with both Table API and SQL, I found there's 'no
watermark' presented in Flink UI, whereas I had been struggling to apply
row time attribute.

For example, below is one of TableSource implementation which wraps
DataStream reading from Kafka.

https://github.com/HeartSaVioR/iot-trucking-app-flink/blob/master/src/main/scala/net/heartsavior/flink/datasource/TruckSpeedSource.scala

(Actually I ended up implementing TableSource to address adding rowtime
attribute as well as reading and parsing JSON. I'd be really happy if
someone can guide a way to get rid of needed of custom implementation of
TableSource.)

and below is one of app I implemented:

https://github.com/HeartSaVioR/iot-trucking-app-flink/blob/master/src/main/scala/net/heartsavior/flink/app/sql/IotTruckingAppMovingAggregationsOnSpeedSql.scala

Btw, I'm about to experiment side-output with late events, but is it
possible to leverage side-output with Table API / SQL? Looks like
DataStream exposes late events only when it's converted to
AllWindowedStream.

Thanks in advance!

Best Regards,
Jungtaek Lim (HeartSaVioR)

Re: [Table API/SQL] Finding missing spots to resolve why 'no watermark' is presented in Flink UI

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Thanks for the PR! I'll have a look at it later today.

The problem of the retraction stream conversion is probably that the return
type is a Tuple2[Boolean, Row].
The boolean flag indicates whether the row is added or retracted.

Best, Fabian

2018-07-04 15:38 GMT+02:00 Jungtaek Lim <ka...@gmail.com>:

> Thanks Fabian, filed FLINK-9742 [1].
>
> I'll submit a PR for FLINK-8094 via providing my TimestampExtractor. The
> implementation is also described as FLINK-9742. I'll start with current
> implementation which just leverages automatic cast from STRING to
> SQL_TIMESTAMP, but we could improve it from PR. Feedbacks are welcome!
>
> Btw, maybe need to initiate from another thread, but I also had to
> struggle to find a solution to convert table to retract stream. Looks like
> "implicit conversion" comes into play prior to toRetractStream and raise
> error. outTable is the result of "distinct" which looks like requiring
> retract mode. (Not even easy for me to know I should provide implicit
> TypeInformation for Row, but I'm fairly new to Scala so it's just me.)
>
> // below doesn't work as below line implicitly converts table as 'append stream'
> // via org.apache.flink.table.api.scala.package$.table2RowDataStream
> // though we are calling toRetractStream
> //outTable.toRetractStream[Row](outTable.dataType).print()
>
> implicit val typeInfo = Types.ROW(outTable.getSchema.getColumnNames,
>   outTable.getSchema.getTypes)
> tableEnv.toRetractStream[Row](outTable).print()
>
>
> Thanks again,
> Jungtaek Lim (HeartSaVioR)
>
> [1] https://issues.apache.org/jira/browse/FLINK-9742
>
> 2018년 7월 4일 (수) 오후 10:03, Fabian Hueske <fh...@gmail.com>님이 작성:
>
>> Hi,
>>
>> Glad you could get it to work! That's great :-)
>>
>> Regarding you comments:
>>
>> 1) Yes, I think we should make resultType() public. Please open a Jira
>> issue and describe your use case.
>> Btw. would you like to contribute your TimestampExtractor to Flink (or
>> even a more generic one that allows to configure the format of the
>> timestamp string)? There is FLINK-8094 [1].
>> 2) This is "expected" because you define two different schemas, the JSON
>> schema which defines how to read the data and the Table schema that defines
>> how it is exposed to the Table API / SQL.
>>
>> Thanks, Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-8094
>>
>> 2018-07-04 14:52 GMT+02:00 Jungtaek Lim <ka...@gmail.com>:
>>
>>> Thanks again Fabian for providing nice suggestion!
>>>
>>> Finally I got it working with applying your suggestion. Couple of tricks
>>> was needed:
>>>
>>> 1. I had to apply a hack (create new TimestampExtractor class to package
>>> org.apache.flink.blabla...) since Expression.resultType is defined as
>>> "package private" for flink. I feel adjusting scope of Explain's methods
>>> (at least resultType) to "public" would help on implementing custom
>>> TimestampExtractor in users' side: please let me know your thought about
>>> this. If you think it makes sense, I will file an issue and submit a PR, or
>>> initiate a new thread in dev mailing list to discuss it if the step is
>>> recommend.
>>>
>>> 2. To ensure KafkaTableSource's verification of rowtime field type, the
>>> type of field (here in "eventTime") should be defined as SQL_TIMESTAMP
>>> whereas the type of field in JSON should be defined as STRING.
>>>
>>> Kafka010JsonTableSource.builder()
>>>   .forTopic(topic)
>>>   .withSchema(TableSchema.builder()
>>>     .field("eventTime", Types.SQL_TIMESTAMP)
>>>     .build())
>>>   .forJsonSchema(TableSchema.builder()
>>>     .field("eventTime", Types.STRING)
>>>     .build())
>>>   .withKafkaProperties(prop)
>>>   .withRowtimeAttribute(
>>>     "eventTime",
>>>     new IsoDateStringAwareExistingField("eventTime"),
>>>     new BoundedOutOfOrderTimestamps(Time.minutes(1).toMilliseconds)
>>>   )
>>>   .build()
>>>
>>> Thanks again!
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> 2018년 7월 4일 (수) 오후 8:18, Fabian Hueske <fh...@gmail.com>님이 작성:
>>>
>>>> Hi Jungtaek,
>>>>
>>>> If it is "only" about the missing support to parse a string as
>>>> timestamp, you could also implement a custom TimestampExtractor that works
>>>> similar to the ExistingField extractor [1].
>>>> You would need to adjust a few things and use the expression
>>>> "Cast(Cast('tsString, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)" to convert
>>>> the String to a Long.
>>>> So far this works only if the date is formatted like "2018-05-28
>>>> 12:34:56.000"
>>>>
>>>> Regarding the side outputs, these would not be handled as results but
>>>> just redirect late records into separate data streams. We would offer a
>>>> configuration to write them to a sink like HDFS or Kafka.
>>>>
>>>> Best, Fabian
>>>>
>>>> [1] https://github.com/apache/flink/blob/master/flink-
>>>> libraries/flink-table/src/main/scala/org/apache/flink/
>>>> table/sources/tsextractors/ExistingField.scala
>>>>
>>>> 2018-07-04 11:54 GMT+02:00 Jungtaek Lim <ka...@gmail.com>:
>>>>
>>>>> Thanks Chesnay! Great news to hear. I'll try out with latest master
>>>>> branch.
>>>>>
>>>>> Thanks Fabian for providing the docs!
>>>>>
>>>>> I guess I already tried out with KafkaJsonTableSource and failed back
>>>>> to custom TableSource since the type of rowtime field is string
>>>>> unfortunately, and I needed to parse and map to new SQL timestamp field in
>>>>> order to use it to rowtime attribute.
>>>>>
>>>>> I guess JSON -> table fields mapping is provided only for renaming,
>>>>> and "withRowtimeAttribute" doesn't help defining new field to use it as
>>>>> rowtime.
>>>>>
>>>>> Are there better approaches on this scenario? Or would we be better to
>>>>> assume the type of rowtime field is always timestamp?
>>>>>
>>>>> Btw, providing late-data side output in Table API might be just a
>>>>> matter of how to define it correctly (not a technical or syntactic issue),
>>>>> though providing in SQL might be tricky (as the semantic of SQL query is
>>>>> not for multiple outputs).
>>>>>
>>>>> Thanks,
>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>
>>>>> 2018년 7월 4일 (수) 오후 5:49, Fabian Hueske <fh...@gmail.com>님이 작성:
>>>>>
>>>>>> Hi Jungtaek,
>>>>>>
>>>>>> Flink 1.5.0 features a TableSource for Kafka and JSON [1], incl.
>>>>>> timestamp & watemark generation [2].
>>>>>> It would be great if you could let us know, if that addresses your
>>>>>> use case and if not what's missing or not working.
>>>>>>
>>>>>> So far Table API / SQL does not have support for late-data side
>>>>>> outputs. However, that's on the road map. The idea is to filter streams
>>>>>> during ingestion for late events and passing them to a side output.
>>>>>> Currently, operators just drop late events.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-
>>>>>> release-1.5/dev/table/sourceSinks.html#kafkajsontablesource
>>>>>> [2] https://ci.apache.org/projects/flink/flink-docs-
>>>>>> release-1.5/dev/table/sourceSinks.html#configuring-
>>>>>> a-rowtime-attribute
>>>>>>
>>>>>> 2018-07-04 10:39 GMT+02:00 Chesnay Schepler <ch...@apache.org>:
>>>>>>
>>>>>>> The watermark display in the UI is bugged in 1.5.0.
>>>>>>>
>>>>>>> It is fixed on master and the release-1.5 branch, and will be
>>>>>>> included in 1.5.1 that is slated to be released next week.
>>>>>>>
>>>>>>>
>>>>>>> On 04.07.2018 10:22, Jungtaek Lim wrote:
>>>>>>>
>>>>>>> Sorry I forgot to mention the version: Flink 1.5.0, and I ran the
>>>>>>> app in IntelliJ, not tried from cluster.
>>>>>>>
>>>>>>> 2018년 7월 4일 (수) 오후 5:15, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>>>>
>>>>>>>> Hi Flink users,
>>>>>>>>
>>>>>>>> I'm new to Flink and trying to evaluate couple of streaming
>>>>>>>> frameworks via implementing same apps.
>>>>>>>>
>>>>>>>> While implementing apps with both Table API and SQL, I found
>>>>>>>> there's 'no watermark' presented in Flink UI, whereas I had been struggling
>>>>>>>> to apply row time attribute.
>>>>>>>>
>>>>>>>> For example, below is one of TableSource implementation which wraps
>>>>>>>> DataStream reading from Kafka.
>>>>>>>>
>>>>>>>> https://github.com/HeartSaVioR/iot-trucking-app-
>>>>>>>> flink/blob/master/src/main/scala/net/heartsavior/flink/
>>>>>>>> datasource/TruckSpeedSource.scala
>>>>>>>>
>>>>>>>> (Actually I ended up implementing TableSource to address adding
>>>>>>>> rowtime attribute as well as reading and parsing JSON. I'd be really happy
>>>>>>>> if someone can guide a way to get rid of needed of custom implementation of
>>>>>>>> TableSource.)
>>>>>>>>
>>>>>>>> and below is one of app I implemented:
>>>>>>>>
>>>>>>>> https://github.com/HeartSaVioR/iot-trucking-app-
>>>>>>>> flink/blob/master/src/main/scala/net/heartsavior/flink/app/sql/
>>>>>>>> IotTruckingAppMovingAggregationsOnSpeedSql.scala
>>>>>>>>
>>>>>>>> Btw, I'm about to experiment side-output with late events, but is
>>>>>>>> it possible to leverage side-output with Table API / SQL? Looks like
>>>>>>>> DataStream exposes late events only when it's converted to
>>>>>>>> AllWindowedStream.
>>>>>>>>
>>>>>>>> Thanks in advance!
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>

Re: [Table API/SQL] Finding missing spots to resolve why 'no watermark' is presented in Flink UI

Posted by Jungtaek Lim <ka...@gmail.com>.
Thanks Fabian, filed FLINK-9742 [1].

I'll submit a PR for FLINK-8094 via providing my TimestampExtractor. The
implementation is also described as FLINK-9742. I'll start with current
implementation which just leverages automatic cast from STRING to
SQL_TIMESTAMP, but we could improve it from PR. Feedbacks are welcome!

Btw, maybe need to initiate from another thread, but I also had to struggle
to find a solution to convert table to retract stream. Looks like "implicit
conversion" comes into play prior to toRetractStream and raise error.
outTable is the result of "distinct" which looks like requiring retract
mode. (Not even easy for me to know I should provide implicit
TypeInformation for Row, but I'm fairly new to Scala so it's just me.)

// below doesn't work as below line implicitly converts table as 'append stream'
// via org.apache.flink.table.api.scala.package$.table2RowDataStream
// though we are calling toRetractStream
//outTable.toRetractStream[Row](outTable.dataType).print()

implicit val typeInfo = Types.ROW(outTable.getSchema.getColumnNames,
  outTable.getSchema.getTypes)
tableEnv.toRetractStream[Row](outTable).print()


Thanks again,
Jungtaek Lim (HeartSaVioR)

[1] https://issues.apache.org/jira/browse/FLINK-9742

2018년 7월 4일 (수) 오후 10:03, Fabian Hueske <fh...@gmail.com>님이 작성:

> Hi,
>
> Glad you could get it to work! That's great :-)
>
> Regarding you comments:
>
> 1) Yes, I think we should make resultType() public. Please open a Jira
> issue and describe your use case.
> Btw. would you like to contribute your TimestampExtractor to Flink (or
> even a more generic one that allows to configure the format of the
> timestamp string)? There is FLINK-8094 [1].
> 2) This is "expected" because you define two different schemas, the JSON
> schema which defines how to read the data and the Table schema that defines
> how it is exposed to the Table API / SQL.
>
> Thanks, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-8094
>
> 2018-07-04 14:52 GMT+02:00 Jungtaek Lim <ka...@gmail.com>:
>
>> Thanks again Fabian for providing nice suggestion!
>>
>> Finally I got it working with applying your suggestion. Couple of tricks
>> was needed:
>>
>> 1. I had to apply a hack (create new TimestampExtractor class to package
>> org.apache.flink.blabla...) since Expression.resultType is defined as
>> "package private" for flink. I feel adjusting scope of Explain's methods
>> (at least resultType) to "public" would help on implementing custom
>> TimestampExtractor in users' side: please let me know your thought about
>> this. If you think it makes sense, I will file an issue and submit a PR, or
>> initiate a new thread in dev mailing list to discuss it if the step is
>> recommend.
>>
>> 2. To ensure KafkaTableSource's verification of rowtime field type, the
>> type of field (here in "eventTime") should be defined as SQL_TIMESTAMP
>> whereas the type of field in JSON should be defined as STRING.
>>
>> Kafka010JsonTableSource.builder()
>>   .forTopic(topic)
>>   .withSchema(TableSchema.builder()
>>     .field("eventTime", Types.SQL_TIMESTAMP)
>>     .build())
>>   .forJsonSchema(TableSchema.builder()
>>     .field("eventTime", Types.STRING)
>>     .build())
>>   .withKafkaProperties(prop)
>>   .withRowtimeAttribute(
>>     "eventTime",
>>     new IsoDateStringAwareExistingField("eventTime"),
>>     new BoundedOutOfOrderTimestamps(Time.minutes(1).toMilliseconds)
>>   )
>>   .build()
>>
>> Thanks again!
>> Jungtaek Lim (HeartSaVioR)
>>
>> 2018년 7월 4일 (수) 오후 8:18, Fabian Hueske <fh...@gmail.com>님이 작성:
>>
>>> Hi Jungtaek,
>>>
>>> If it is "only" about the missing support to parse a string as
>>> timestamp, you could also implement a custom TimestampExtractor that works
>>> similar to the ExistingField extractor [1].
>>> You would need to adjust a few things and use the expression
>>> "Cast(Cast('tsString, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)" to convert
>>> the String to a Long.
>>> So far this works only if the date is formatted like "2018-05-28
>>> 12:34:56.000"
>>>
>>> Regarding the side outputs, these would not be handled as results but
>>> just redirect late records into separate data streams. We would offer a
>>> configuration to write them to a sink like HDFS or Kafka.
>>>
>>> Best, Fabian
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
>>>
>>> 2018-07-04 11:54 GMT+02:00 Jungtaek Lim <ka...@gmail.com>:
>>>
>>>> Thanks Chesnay! Great news to hear. I'll try out with latest master
>>>> branch.
>>>>
>>>> Thanks Fabian for providing the docs!
>>>>
>>>> I guess I already tried out with KafkaJsonTableSource and failed back
>>>> to custom TableSource since the type of rowtime field is string
>>>> unfortunately, and I needed to parse and map to new SQL timestamp field in
>>>> order to use it to rowtime attribute.
>>>>
>>>> I guess JSON -> table fields mapping is provided only for renaming, and
>>>> "withRowtimeAttribute" doesn't help defining new field to use it as rowtime.
>>>>
>>>> Are there better approaches on this scenario? Or would we be better to
>>>> assume the type of rowtime field is always timestamp?
>>>>
>>>> Btw, providing late-data side output in Table API might be just a
>>>> matter of how to define it correctly (not a technical or syntactic issue),
>>>> though providing in SQL might be tricky (as the semantic of SQL query is
>>>> not for multiple outputs).
>>>>
>>>> Thanks,
>>>> Jungtaek Lim (HeartSaVioR)
>>>>
>>>> 2018년 7월 4일 (수) 오후 5:49, Fabian Hueske <fh...@gmail.com>님이 작성:
>>>>
>>>>> Hi Jungtaek,
>>>>>
>>>>> Flink 1.5.0 features a TableSource for Kafka and JSON [1], incl.
>>>>> timestamp & watemark generation [2].
>>>>> It would be great if you could let us know, if that addresses your use
>>>>> case and if not what's missing or not working.
>>>>>
>>>>> So far Table API / SQL does not have support for late-data side
>>>>> outputs. However, that's on the road map. The idea is to filter streams
>>>>> during ingestion for late events and passing them to a side output.
>>>>> Currently, operators just drop late events.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html#kafkajsontablesource
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html#configuring-a-rowtime-attribute
>>>>>
>>>>> 2018-07-04 10:39 GMT+02:00 Chesnay Schepler <ch...@apache.org>:
>>>>>
>>>>>> The watermark display in the UI is bugged in 1.5.0.
>>>>>>
>>>>>> It is fixed on master and the release-1.5 branch, and will be
>>>>>> included in 1.5.1 that is slated to be released next week.
>>>>>>
>>>>>>
>>>>>> On 04.07.2018 10:22, Jungtaek Lim wrote:
>>>>>>
>>>>>> Sorry I forgot to mention the version: Flink 1.5.0, and I ran the app
>>>>>> in IntelliJ, not tried from cluster.
>>>>>>
>>>>>> 2018년 7월 4일 (수) 오후 5:15, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>>>
>>>>>>> Hi Flink users,
>>>>>>>
>>>>>>> I'm new to Flink and trying to evaluate couple of streaming
>>>>>>> frameworks via implementing same apps.
>>>>>>>
>>>>>>> While implementing apps with both Table API and SQL, I found there's
>>>>>>> 'no watermark' presented in Flink UI, whereas I had been struggling to
>>>>>>> apply row time attribute.
>>>>>>>
>>>>>>> For example, below is one of TableSource implementation which wraps
>>>>>>> DataStream reading from Kafka.
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/HeartSaVioR/iot-trucking-app-flink/blob/master/src/main/scala/net/heartsavior/flink/datasource/TruckSpeedSource.scala
>>>>>>>
>>>>>>> (Actually I ended up implementing TableSource to address adding
>>>>>>> rowtime attribute as well as reading and parsing JSON. I'd be really happy
>>>>>>> if someone can guide a way to get rid of needed of custom implementation of
>>>>>>> TableSource.)
>>>>>>>
>>>>>>> and below is one of app I implemented:
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/HeartSaVioR/iot-trucking-app-flink/blob/master/src/main/scala/net/heartsavior/flink/app/sql/IotTruckingAppMovingAggregationsOnSpeedSql.scala
>>>>>>>
>>>>>>> Btw, I'm about to experiment side-output with late events, but is it
>>>>>>> possible to leverage side-output with Table API / SQL? Looks like
>>>>>>> DataStream exposes late events only when it's converted to
>>>>>>> AllWindowedStream.
>>>>>>>
>>>>>>> Thanks in advance!
>>>>>>>
>>>>>>> Best Regards,
>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>
>

Re: [Table API/SQL] Finding missing spots to resolve why 'no watermark' is presented in Flink UI

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Glad you could get it to work! That's great :-)

Regarding you comments:

1) Yes, I think we should make resultType() public. Please open a Jira
issue and describe your use case.
Btw. would you like to contribute your TimestampExtractor to Flink (or even
a more generic one that allows to configure the format of the timestamp
string)? There is FLINK-8094 [1].
2) This is "expected" because you define two different schemas, the JSON
schema which defines how to read the data and the Table schema that defines
how it is exposed to the Table API / SQL.

Thanks, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-8094

2018-07-04 14:52 GMT+02:00 Jungtaek Lim <ka...@gmail.com>:

> Thanks again Fabian for providing nice suggestion!
>
> Finally I got it working with applying your suggestion. Couple of tricks
> was needed:
>
> 1. I had to apply a hack (create new TimestampExtractor class to package
> org.apache.flink.blabla...) since Expression.resultType is defined as
> "package private" for flink. I feel adjusting scope of Explain's methods
> (at least resultType) to "public" would help on implementing custom
> TimestampExtractor in users' side: please let me know your thought about
> this. If you think it makes sense, I will file an issue and submit a PR, or
> initiate a new thread in dev mailing list to discuss it if the step is
> recommend.
>
> 2. To ensure KafkaTableSource's verification of rowtime field type, the
> type of field (here in "eventTime") should be defined as SQL_TIMESTAMP
> whereas the type of field in JSON should be defined as STRING.
>
> Kafka010JsonTableSource.builder()
>   .forTopic(topic)
>   .withSchema(TableSchema.builder()
>     .field("eventTime", Types.SQL_TIMESTAMP)
>     .build())
>   .forJsonSchema(TableSchema.builder()
>     .field("eventTime", Types.STRING)
>     .build())
>   .withKafkaProperties(prop)
>   .withRowtimeAttribute(
>     "eventTime",
>     new IsoDateStringAwareExistingField("eventTime"),
>     new BoundedOutOfOrderTimestamps(Time.minutes(1).toMilliseconds)
>   )
>   .build()
>
> Thanks again!
> Jungtaek Lim (HeartSaVioR)
>
> 2018년 7월 4일 (수) 오후 8:18, Fabian Hueske <fh...@gmail.com>님이 작성:
>
>> Hi Jungtaek,
>>
>> If it is "only" about the missing support to parse a string as timestamp,
>> you could also implement a custom TimestampExtractor that works similar to
>> the ExistingField extractor [1].
>> You would need to adjust a few things and use the expression
>> "Cast(Cast('tsString, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)" to convert
>> the String to a Long.
>> So far this works only if the date is formatted like "2018-05-28
>> 12:34:56.000"
>>
>> Regarding the side outputs, these would not be handled as results but
>> just redirect late records into separate data streams. We would offer a
>> configuration to write them to a sink like HDFS or Kafka.
>>
>> Best, Fabian
>>
>> [1] https://github.com/apache/flink/blob/master/flink-
>> libraries/flink-table/src/main/scala/org/apache/flink/
>> table/sources/tsextractors/ExistingField.scala
>>
>> 2018-07-04 11:54 GMT+02:00 Jungtaek Lim <ka...@gmail.com>:
>>
>>> Thanks Chesnay! Great news to hear. I'll try out with latest master
>>> branch.
>>>
>>> Thanks Fabian for providing the docs!
>>>
>>> I guess I already tried out with KafkaJsonTableSource and failed back to
>>> custom TableSource since the type of rowtime field is string unfortunately,
>>> and I needed to parse and map to new SQL timestamp field in order to use it
>>> to rowtime attribute.
>>>
>>> I guess JSON -> table fields mapping is provided only for renaming, and
>>> "withRowtimeAttribute" doesn't help defining new field to use it as rowtime.
>>>
>>> Are there better approaches on this scenario? Or would we be better to
>>> assume the type of rowtime field is always timestamp?
>>>
>>> Btw, providing late-data side output in Table API might be just a matter
>>> of how to define it correctly (not a technical or syntactic issue), though
>>> providing in SQL might be tricky (as the semantic of SQL query is not for
>>> multiple outputs).
>>>
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> 2018년 7월 4일 (수) 오후 5:49, Fabian Hueske <fh...@gmail.com>님이 작성:
>>>
>>>> Hi Jungtaek,
>>>>
>>>> Flink 1.5.0 features a TableSource for Kafka and JSON [1], incl.
>>>> timestamp & watemark generation [2].
>>>> It would be great if you could let us know, if that addresses your use
>>>> case and if not what's missing or not working.
>>>>
>>>> So far Table API / SQL does not have support for late-data side
>>>> outputs. However, that's on the road map. The idea is to filter streams
>>>> during ingestion for late events and passing them to a side output.
>>>> Currently, operators just drop late events.
>>>>
>>>> Best, Fabian
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-
>>>> release-1.5/dev/table/sourceSinks.html#kafkajsontablesource
>>>> [2] https://ci.apache.org/projects/flink/flink-docs-
>>>> release-1.5/dev/table/sourceSinks.html#configuring-a-rowtime-attribute
>>>>
>>>> 2018-07-04 10:39 GMT+02:00 Chesnay Schepler <ch...@apache.org>:
>>>>
>>>>> The watermark display in the UI is bugged in 1.5.0.
>>>>>
>>>>> It is fixed on master and the release-1.5 branch, and will be included
>>>>> in 1.5.1 that is slated to be released next week.
>>>>>
>>>>>
>>>>> On 04.07.2018 10:22, Jungtaek Lim wrote:
>>>>>
>>>>> Sorry I forgot to mention the version: Flink 1.5.0, and I ran the app
>>>>> in IntelliJ, not tried from cluster.
>>>>>
>>>>> 2018년 7월 4일 (수) 오후 5:15, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>>
>>>>>> Hi Flink users,
>>>>>>
>>>>>> I'm new to Flink and trying to evaluate couple of streaming
>>>>>> frameworks via implementing same apps.
>>>>>>
>>>>>> While implementing apps with both Table API and SQL, I found there's
>>>>>> 'no watermark' presented in Flink UI, whereas I had been struggling to
>>>>>> apply row time attribute.
>>>>>>
>>>>>> For example, below is one of TableSource implementation which wraps
>>>>>> DataStream reading from Kafka.
>>>>>>
>>>>>> https://github.com/HeartSaVioR/iot-trucking-app-
>>>>>> flink/blob/master/src/main/scala/net/heartsavior/flink/
>>>>>> datasource/TruckSpeedSource.scala
>>>>>>
>>>>>> (Actually I ended up implementing TableSource to address adding
>>>>>> rowtime attribute as well as reading and parsing JSON. I'd be really happy
>>>>>> if someone can guide a way to get rid of needed of custom implementation of
>>>>>> TableSource.)
>>>>>>
>>>>>> and below is one of app I implemented:
>>>>>>
>>>>>> https://github.com/HeartSaVioR/iot-trucking-app-
>>>>>> flink/blob/master/src/main/scala/net/heartsavior/flink/app/sql/
>>>>>> IotTruckingAppMovingAggregationsOnSpeedSql.scala
>>>>>>
>>>>>> Btw, I'm about to experiment side-output with late events, but is it
>>>>>> possible to leverage side-output with Table API / SQL? Looks like
>>>>>> DataStream exposes late events only when it's converted to
>>>>>> AllWindowedStream.
>>>>>>
>>>>>> Thanks in advance!
>>>>>>
>>>>>> Best Regards,
>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>
>>>>>
>>>>>
>>>>
>>

Re: [Table API/SQL] Finding missing spots to resolve why 'no watermark' is presented in Flink UI

Posted by Jungtaek Lim <ka...@gmail.com>.
Thanks again Fabian for providing nice suggestion!

Finally I got it working with applying your suggestion. Couple of tricks
was needed:

1. I had to apply a hack (create new TimestampExtractor class to package
org.apache.flink.blabla...) since Expression.resultType is defined as
"package private" for flink. I feel adjusting scope of Explain's methods
(at least resultType) to "public" would help on implementing custom
TimestampExtractor in users' side: please let me know your thought about
this. If you think it makes sense, I will file an issue and submit a PR, or
initiate a new thread in dev mailing list to discuss it if the step is
recommend.

2. To ensure KafkaTableSource's verification of rowtime field type, the
type of field (here in "eventTime") should be defined as SQL_TIMESTAMP
whereas the type of field in JSON should be defined as STRING.

Kafka010JsonTableSource.builder()
  .forTopic(topic)
  .withSchema(TableSchema.builder()
    .field("eventTime", Types.SQL_TIMESTAMP)
    .build())
  .forJsonSchema(TableSchema.builder()
    .field("eventTime", Types.STRING)
    .build())
  .withKafkaProperties(prop)
  .withRowtimeAttribute(
    "eventTime",
    new IsoDateStringAwareExistingField("eventTime"),
    new BoundedOutOfOrderTimestamps(Time.minutes(1).toMilliseconds)
  )
  .build()

Thanks again!
Jungtaek Lim (HeartSaVioR)

2018년 7월 4일 (수) 오후 8:18, Fabian Hueske <fh...@gmail.com>님이 작성:

> Hi Jungtaek,
>
> If it is "only" about the missing support to parse a string as timestamp,
> you could also implement a custom TimestampExtractor that works similar to
> the ExistingField extractor [1].
> You would need to adjust a few things and use the expression
> "Cast(Cast('tsString, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)" to convert
> the String to a Long.
> So far this works only if the date is formatted like "2018-05-28
> 12:34:56.000"
>
> Regarding the side outputs, these would not be handled as results but just
> redirect late records into separate data streams. We would offer a
> configuration to write them to a sink like HDFS or Kafka.
>
> Best, Fabian
>
> [1]
> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
>
> 2018-07-04 11:54 GMT+02:00 Jungtaek Lim <ka...@gmail.com>:
>
>> Thanks Chesnay! Great news to hear. I'll try out with latest master
>> branch.
>>
>> Thanks Fabian for providing the docs!
>>
>> I guess I already tried out with KafkaJsonTableSource and failed back to
>> custom TableSource since the type of rowtime field is string unfortunately,
>> and I needed to parse and map to new SQL timestamp field in order to use it
>> to rowtime attribute.
>>
>> I guess JSON -> table fields mapping is provided only for renaming, and
>> "withRowtimeAttribute" doesn't help defining new field to use it as rowtime.
>>
>> Are there better approaches on this scenario? Or would we be better to
>> assume the type of rowtime field is always timestamp?
>>
>> Btw, providing late-data side output in Table API might be just a matter
>> of how to define it correctly (not a technical or syntactic issue), though
>> providing in SQL might be tricky (as the semantic of SQL query is not for
>> multiple outputs).
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> 2018년 7월 4일 (수) 오후 5:49, Fabian Hueske <fh...@gmail.com>님이 작성:
>>
>>> Hi Jungtaek,
>>>
>>> Flink 1.5.0 features a TableSource for Kafka and JSON [1], incl.
>>> timestamp & watemark generation [2].
>>> It would be great if you could let us know, if that addresses your use
>>> case and if not what's missing or not working.
>>>
>>> So far Table API / SQL does not have support for late-data side outputs.
>>> However, that's on the road map. The idea is to filter streams during
>>> ingestion for late events and passing them to a side output.
>>> Currently, operators just drop late events.
>>>
>>> Best, Fabian
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html#kafkajsontablesource
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html#configuring-a-rowtime-attribute
>>>
>>> 2018-07-04 10:39 GMT+02:00 Chesnay Schepler <ch...@apache.org>:
>>>
>>>> The watermark display in the UI is bugged in 1.5.0.
>>>>
>>>> It is fixed on master and the release-1.5 branch, and will be included
>>>> in 1.5.1 that is slated to be released next week.
>>>>
>>>>
>>>> On 04.07.2018 10:22, Jungtaek Lim wrote:
>>>>
>>>> Sorry I forgot to mention the version: Flink 1.5.0, and I ran the app
>>>> in IntelliJ, not tried from cluster.
>>>>
>>>> 2018년 7월 4일 (수) 오후 5:15, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>>
>>>>> Hi Flink users,
>>>>>
>>>>> I'm new to Flink and trying to evaluate couple of streaming frameworks
>>>>> via implementing same apps.
>>>>>
>>>>> While implementing apps with both Table API and SQL, I found there's
>>>>> 'no watermark' presented in Flink UI, whereas I had been struggling to
>>>>> apply row time attribute.
>>>>>
>>>>> For example, below is one of TableSource implementation which wraps
>>>>> DataStream reading from Kafka.
>>>>>
>>>>>
>>>>> https://github.com/HeartSaVioR/iot-trucking-app-flink/blob/master/src/main/scala/net/heartsavior/flink/datasource/TruckSpeedSource.scala
>>>>>
>>>>> (Actually I ended up implementing TableSource to address adding
>>>>> rowtime attribute as well as reading and parsing JSON. I'd be really happy
>>>>> if someone can guide a way to get rid of needed of custom implementation of
>>>>> TableSource.)
>>>>>
>>>>> and below is one of app I implemented:
>>>>>
>>>>>
>>>>> https://github.com/HeartSaVioR/iot-trucking-app-flink/blob/master/src/main/scala/net/heartsavior/flink/app/sql/IotTruckingAppMovingAggregationsOnSpeedSql.scala
>>>>>
>>>>> Btw, I'm about to experiment side-output with late events, but is it
>>>>> possible to leverage side-output with Table API / SQL? Looks like
>>>>> DataStream exposes late events only when it's converted to
>>>>> AllWindowedStream.
>>>>>
>>>>> Thanks in advance!
>>>>>
>>>>> Best Regards,
>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>
>>>>
>>>>
>>>
>

Re: [Table API/SQL] Finding missing spots to resolve why 'no watermark' is presented in Flink UI

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Jungtaek,

If it is "only" about the missing support to parse a string as timestamp,
you could also implement a custom TimestampExtractor that works similar to
the ExistingField extractor [1].
You would need to adjust a few things and use the expression
"Cast(Cast('tsString, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)" to convert
the String to a Long.
So far this works only if the date is formatted like "2018-05-28
12:34:56.000"

Regarding the side outputs, these would not be handled as results but just
redirect late records into separate data streams. We would offer a
configuration to write them to a sink like HDFS or Kafka.

Best, Fabian

[1] https://github.com/apache/flink/blob/master/flink-
libraries/flink-table/src/main/scala/org/apache/flink/
table/sources/tsextractors/ExistingField.scala

2018-07-04 11:54 GMT+02:00 Jungtaek Lim <ka...@gmail.com>:

> Thanks Chesnay! Great news to hear. I'll try out with latest master branch.
>
> Thanks Fabian for providing the docs!
>
> I guess I already tried out with KafkaJsonTableSource and failed back to
> custom TableSource since the type of rowtime field is string unfortunately,
> and I needed to parse and map to new SQL timestamp field in order to use it
> to rowtime attribute.
>
> I guess JSON -> table fields mapping is provided only for renaming, and
> "withRowtimeAttribute" doesn't help defining new field to use it as rowtime.
>
> Are there better approaches on this scenario? Or would we be better to
> assume the type of rowtime field is always timestamp?
>
> Btw, providing late-data side output in Table API might be just a matter
> of how to define it correctly (not a technical or syntactic issue), though
> providing in SQL might be tricky (as the semantic of SQL query is not for
> multiple outputs).
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2018년 7월 4일 (수) 오후 5:49, Fabian Hueske <fh...@gmail.com>님이 작성:
>
>> Hi Jungtaek,
>>
>> Flink 1.5.0 features a TableSource for Kafka and JSON [1], incl.
>> timestamp & watemark generation [2].
>> It would be great if you could let us know, if that addresses your use
>> case and if not what's missing or not working.
>>
>> So far Table API / SQL does not have support for late-data side outputs.
>> However, that's on the road map. The idea is to filter streams during
>> ingestion for late events and passing them to a side output.
>> Currently, operators just drop late events.
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-
>> release-1.5/dev/table/sourceSinks.html#kafkajsontablesource
>> [2] https://ci.apache.org/projects/flink/flink-docs-
>> release-1.5/dev/table/sourceSinks.html#configuring-a-rowtime-attribute
>>
>> 2018-07-04 10:39 GMT+02:00 Chesnay Schepler <ch...@apache.org>:
>>
>>> The watermark display in the UI is bugged in 1.5.0.
>>>
>>> It is fixed on master and the release-1.5 branch, and will be included
>>> in 1.5.1 that is slated to be released next week.
>>>
>>>
>>> On 04.07.2018 10:22, Jungtaek Lim wrote:
>>>
>>> Sorry I forgot to mention the version: Flink 1.5.0, and I ran the app in
>>> IntelliJ, not tried from cluster.
>>>
>>> 2018년 7월 4일 (수) 오후 5:15, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>>
>>>> Hi Flink users,
>>>>
>>>> I'm new to Flink and trying to evaluate couple of streaming frameworks
>>>> via implementing same apps.
>>>>
>>>> While implementing apps with both Table API and SQL, I found there's
>>>> 'no watermark' presented in Flink UI, whereas I had been struggling to
>>>> apply row time attribute.
>>>>
>>>> For example, below is one of TableSource implementation which wraps
>>>> DataStream reading from Kafka.
>>>>
>>>> https://github.com/HeartSaVioR/iot-trucking-app-
>>>> flink/blob/master/src/main/scala/net/heartsavior/flink/
>>>> datasource/TruckSpeedSource.scala
>>>>
>>>> (Actually I ended up implementing TableSource to address adding rowtime
>>>> attribute as well as reading and parsing JSON. I'd be really happy if
>>>> someone can guide a way to get rid of needed of custom implementation of
>>>> TableSource.)
>>>>
>>>> and below is one of app I implemented:
>>>>
>>>> https://github.com/HeartSaVioR/iot-trucking-app-
>>>> flink/blob/master/src/main/scala/net/heartsavior/flink/app/sql/
>>>> IotTruckingAppMovingAggregationsOnSpeedSql.scala
>>>>
>>>> Btw, I'm about to experiment side-output with late events, but is it
>>>> possible to leverage side-output with Table API / SQL? Looks like
>>>> DataStream exposes late events only when it's converted to
>>>> AllWindowedStream.
>>>>
>>>> Thanks in advance!
>>>>
>>>> Best Regards,
>>>> Jungtaek Lim (HeartSaVioR)
>>>>
>>>
>>>
>>

Re: [Table API/SQL] Finding missing spots to resolve why 'no watermark' is presented in Flink UI

Posted by Jungtaek Lim <ka...@gmail.com>.
Thanks Chesnay! Great news to hear. I'll try out with latest master branch.

Thanks Fabian for providing the docs!

I guess I already tried out with KafkaJsonTableSource and failed back to
custom TableSource since the type of rowtime field is string unfortunately,
and I needed to parse and map to new SQL timestamp field in order to use it
to rowtime attribute.

I guess JSON -> table fields mapping is provided only for renaming, and
"withRowtimeAttribute" doesn't help defining new field to use it as rowtime.

Are there better approaches on this scenario? Or would we be better to
assume the type of rowtime field is always timestamp?

Btw, providing late-data side output in Table API might be just a matter of
how to define it correctly (not a technical or syntactic issue), though
providing in SQL might be tricky (as the semantic of SQL query is not for
multiple outputs).

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 7월 4일 (수) 오후 5:49, Fabian Hueske <fh...@gmail.com>님이 작성:

> Hi Jungtaek,
>
> Flink 1.5.0 features a TableSource for Kafka and JSON [1], incl. timestamp
> & watemark generation [2].
> It would be great if you could let us know, if that addresses your use
> case and if not what's missing or not working.
>
> So far Table API / SQL does not have support for late-data side outputs.
> However, that's on the road map. The idea is to filter streams during
> ingestion for late events and passing them to a side output.
> Currently, operators just drop late events.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html#kafkajsontablesource
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html#configuring-a-rowtime-attribute
>
> 2018-07-04 10:39 GMT+02:00 Chesnay Schepler <ch...@apache.org>:
>
>> The watermark display in the UI is bugged in 1.5.0.
>>
>> It is fixed on master and the release-1.5 branch, and will be included in
>> 1.5.1 that is slated to be released next week.
>>
>>
>> On 04.07.2018 10:22, Jungtaek Lim wrote:
>>
>> Sorry I forgot to mention the version: Flink 1.5.0, and I ran the app in
>> IntelliJ, not tried from cluster.
>>
>> 2018년 7월 4일 (수) 오후 5:15, Jungtaek Lim <ka...@gmail.com>님이 작성:
>>
>>> Hi Flink users,
>>>
>>> I'm new to Flink and trying to evaluate couple of streaming frameworks
>>> via implementing same apps.
>>>
>>> While implementing apps with both Table API and SQL, I found there's 'no
>>> watermark' presented in Flink UI, whereas I had been struggling to apply
>>> row time attribute.
>>>
>>> For example, below is one of TableSource implementation which wraps
>>> DataStream reading from Kafka.
>>>
>>>
>>> https://github.com/HeartSaVioR/iot-trucking-app-flink/blob/master/src/main/scala/net/heartsavior/flink/datasource/TruckSpeedSource.scala
>>>
>>> (Actually I ended up implementing TableSource to address adding rowtime
>>> attribute as well as reading and parsing JSON. I'd be really happy if
>>> someone can guide a way to get rid of needed of custom implementation of
>>> TableSource.)
>>>
>>> and below is one of app I implemented:
>>>
>>>
>>> https://github.com/HeartSaVioR/iot-trucking-app-flink/blob/master/src/main/scala/net/heartsavior/flink/app/sql/IotTruckingAppMovingAggregationsOnSpeedSql.scala
>>>
>>> Btw, I'm about to experiment side-output with late events, but is it
>>> possible to leverage side-output with Table API / SQL? Looks like
>>> DataStream exposes late events only when it's converted to
>>> AllWindowedStream.
>>>
>>> Thanks in advance!
>>>
>>> Best Regards,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>
>>
>

Re: [Table API/SQL] Finding missing spots to resolve why 'no watermark' is presented in Flink UI

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Jungtaek,

Flink 1.5.0 features a TableSource for Kafka and JSON [1], incl. timestamp
& watemark generation [2].
It would be great if you could let us know, if that addresses your use case
and if not what's missing or not working.

So far Table API / SQL does not have support for late-data side outputs.
However, that's on the road map. The idea is to filter streams during
ingestion for late events and passing them to a side output.
Currently, operators just drop late events.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html#kafkajsontablesource
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html#configuring-a-rowtime-attribute

2018-07-04 10:39 GMT+02:00 Chesnay Schepler <ch...@apache.org>:

> The watermark display in the UI is bugged in 1.5.0.
>
> It is fixed on master and the release-1.5 branch, and will be included in
> 1.5.1 that is slated to be released next week.
>
>
> On 04.07.2018 10:22, Jungtaek Lim wrote:
>
> Sorry I forgot to mention the version: Flink 1.5.0, and I ran the app in
> IntelliJ, not tried from cluster.
>
> 2018년 7월 4일 (수) 오후 5:15, Jungtaek Lim <ka...@gmail.com>님이 작성:
>
>> Hi Flink users,
>>
>> I'm new to Flink and trying to evaluate couple of streaming frameworks
>> via implementing same apps.
>>
>> While implementing apps with both Table API and SQL, I found there's 'no
>> watermark' presented in Flink UI, whereas I had been struggling to apply
>> row time attribute.
>>
>> For example, below is one of TableSource implementation which wraps
>> DataStream reading from Kafka.
>>
>> https://github.com/HeartSaVioR/iot-trucking-app-
>> flink/blob/master/src/main/scala/net/heartsavior/flink/
>> datasource/TruckSpeedSource.scala
>>
>> (Actually I ended up implementing TableSource to address adding rowtime
>> attribute as well as reading and parsing JSON. I'd be really happy if
>> someone can guide a way to get rid of needed of custom implementation of
>> TableSource.)
>>
>> and below is one of app I implemented:
>>
>> https://github.com/HeartSaVioR/iot-trucking-app-
>> flink/blob/master/src/main/scala/net/heartsavior/flink/app/sql/
>> IotTruckingAppMovingAggregationsOnSpeedSql.scala
>>
>> Btw, I'm about to experiment side-output with late events, but is it
>> possible to leverage side-output with Table API / SQL? Looks like
>> DataStream exposes late events only when it's converted to
>> AllWindowedStream.
>>
>> Thanks in advance!
>>
>> Best Regards,
>> Jungtaek Lim (HeartSaVioR)
>>
>
>

Re: [Table API/SQL] Finding missing spots to resolve why 'no watermark' is presented in Flink UI

Posted by Chesnay Schepler <ch...@apache.org>.
The watermark display in the UI is bugged in 1.5.0.

It is fixed on master and the release-1.5 branch, and will be included 
in 1.5.1 that is slated to be released next week.

On 04.07.2018 10:22, Jungtaek Lim wrote:
> Sorry I forgot to mention the version: Flink 1.5.0, and I ran the app 
> in IntelliJ, not tried from cluster.
>
> 2018년 7월 4일 (수) 오후 5:15, Jungtaek Lim <kabhwan@gmail.com 
> <ma...@gmail.com>>님이 작성:
>
>     Hi Flink users,
>
>     I'm new to Flink and trying to evaluate couple of streaming
>     frameworks via implementing same apps.
>
>     While implementing apps with both Table API and SQL, I found
>     there's 'no watermark' presented in Flink UI, whereas I had been
>     struggling to apply row time attribute.
>
>     For example, below is one of TableSource implementation which
>     wraps DataStream reading from Kafka.
>
>     https://github.com/HeartSaVioR/iot-trucking-app-flink/blob/master/src/main/scala/net/heartsavior/flink/datasource/TruckSpeedSource.scala
>
>     (Actually I ended up implementing TableSource to address adding
>     rowtime attribute as well as reading and parsing JSON. I'd be
>     really happy if someone can guide a way to get rid of needed of
>     custom implementation of TableSource.)
>
>     and below is one of app I implemented:
>
>     https://github.com/HeartSaVioR/iot-trucking-app-flink/blob/master/src/main/scala/net/heartsavior/flink/app/sql/IotTruckingAppMovingAggregationsOnSpeedSql.scala
>
>     Btw, I'm about to experiment side-output with late events, but is
>     it possible to leverage side-output with Table API / SQL? Looks
>     like DataStream exposes late events only when it's converted to
>     AllWindowedStream.
>
>     Thanks in advance!
>
>     Best Regards,
>     Jungtaek Lim (HeartSaVioR)
>


Re: [Table API/SQL] Finding missing spots to resolve why 'no watermark' is presented in Flink UI

Posted by Jungtaek Lim <ka...@gmail.com>.
Sorry I forgot to mention the version: Flink 1.5.0, and I ran the app in
IntelliJ, not tried from cluster.

2018년 7월 4일 (수) 오후 5:15, Jungtaek Lim <ka...@gmail.com>님이 작성:

> Hi Flink users,
>
> I'm new to Flink and trying to evaluate couple of streaming frameworks via
> implementing same apps.
>
> While implementing apps with both Table API and SQL, I found there's 'no
> watermark' presented in Flink UI, whereas I had been struggling to apply
> row time attribute.
>
> For example, below is one of TableSource implementation which wraps
> DataStream reading from Kafka.
>
>
> https://github.com/HeartSaVioR/iot-trucking-app-flink/blob/master/src/main/scala/net/heartsavior/flink/datasource/TruckSpeedSource.scala
>
> (Actually I ended up implementing TableSource to address adding rowtime
> attribute as well as reading and parsing JSON. I'd be really happy if
> someone can guide a way to get rid of needed of custom implementation of
> TableSource.)
>
> and below is one of app I implemented:
>
>
> https://github.com/HeartSaVioR/iot-trucking-app-flink/blob/master/src/main/scala/net/heartsavior/flink/app/sql/IotTruckingAppMovingAggregationsOnSpeedSql.scala
>
> Btw, I'm about to experiment side-output with late events, but is it
> possible to leverage side-output with Table API / SQL? Looks like
> DataStream exposes late events only when it's converted to
> AllWindowedStream.
>
> Thanks in advance!
>
> Best Regards,
> Jungtaek Lim (HeartSaVioR)
>