You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Xingcan Cui (JIRA)" <ji...@apache.org> on 2017/07/27 15:19:00 UTC

[jira] [Comment Edited] (FLINK-7245) Enhance the operators to support holding back watermarks

    [ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16103282#comment-16103282 ] 

Xingcan Cui edited comment on FLINK-7245 at 7/27/17 3:18 PM:
-------------------------------------------------------------

Hi [~fhueske], it took me a little time to comprehend how the rowtime works in current Table/SQL API. To continue the work, I'd like to share more of my understandings and questions that may be a little *detailed*. I wonder if you could help confirm or answer them.
Suppose there's a class {{Order(a:Long, b:String)}}.
# When registering a rowtime with the API, e.g., {{tEnv.registerDataStream("OrderA", orderA, 'a.rowtime, 'b)}}, I think the current logic should be that field {{a}} is shaded in the physical schema and an extra indicator about the rowtime field is added to the logical schema. I find the following snippet in {{StreamTableEnvironment.extractRowtime()}}.
{code:java}
if (mappedIdx < 0) {
         throw new TableException(
            s"The rowtime attribute can only replace a valid field. " +
            s"${origName.getOrElse(name)} is not a field of type $streamType.")
}
{code}
However, when I tried {{tEnv.registerDataStream("OrderA", orderA, 'a, 'b, 'c.rowtime)}}, it can also be successfully registered with a field {{c}} added. I know whether allowing the extra field or not both make sense, but is still confused about that.
# When translating a SQL, the rowtime field is omitted by the initial "{{Order}} to {{CRow}} operator".
# The planner checks if the rowtime field will be used in a SQL. If the result turns to be true, this special field will be set with the {{ctx.timestamp()}} method in the following operator with a generated function.
#  The user should manually assign watermarks before registering the datastream. Now that the rowtime field will be taken as a common field, shall we consider adding a configurable {{DefaultWatermarkAssigner}} if it is not provided?
# An extra question. Will the operator automatically do something to the records, e.g., dropping some late ones according to their timestamps and the current watermark?

Besides, I found a minor issue in the SQL.html document. It uses an identical name "rowtime" for the field ( {{tableEnv.registerDataStream("Orders", ds, "user, product, amount, proctime.proctime, rowtime.rowtime")}}). Readers may be confused whether they should use the "field name" or the "rowtime" keyword in the SQL.

Thanks, Xingcan


was (Author: xccui):
Hi [~fhueske], it took me a little time to comprehend how the rowtime works in current Table/SQL API. To continue the work, I'd like to share more of my understandings and questions that may be a little *detailed*. I wonder if you could help confirm or answer them.
Suppose there's a class {{Order(a:Long, b:String)}}.
# When registering a rowtime with the API, e.g., {{tEnv.registerDataStream("OrderA", orderA, 'a.rowtime, 'b)}}, I think the current logic should be that field {{a}} is shaded in the physical schema and an extra indicator about the rowtime field is added to the logical schema. I find the following snippet in {{StreamTableEnvironment.extractRowtime()}}.
{code:java}
if (mappedIdx < 0) {
         throw new TableException(
            s"The rowtime attribute can only replace a valid field. " +
            s"${origName.getOrElse(name)} is not a field of type $streamType.")
}
{code}
However, when I tried {{tEnv.registerDataStream("OrderA", orderA, 'a, 'b, 'c.rowtime)}}, it can also be successfully registered with a field {{c}} added. I know whether allowing the extra field or not both make sense, but is still confused about that.
# When translating a SQL, the rowtime field is omitted by the initial "{{Order}} to {{CRow}} operator".
# The planner checks if the rowtime field will be used in a SQL. If the result turns to be true, this special field will be set with the {{ctx.timestamp()}} method in the following operator with a generated function.
#  The user should manually assign watermarks before registering the datastream. Now that the rowtime field will be taken as a common field, shall we consider adding a configurable {{DefaultWatermarkAssigner}} if it is not provided?

Besides, I found a minor issue in the SQL.html document. It uses an identical name "rowtime" for the field ( {{tableEnv.registerDataStream("Orders", ds, "user, product, amount, proctime.proctime, rowtime.rowtime")}}). Readers may be confused whether they should use the "field name" or the "rowtime" keyword in the SQL.

Thanks, Xingcan

> Enhance the operators to support holding back watermarks
> --------------------------------------------------------
>
>                 Key: FLINK-7245
>                 URL: https://issues.apache.org/jira/browse/FLINK-7245
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Xingcan Cui
>            Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
> 	if (timeServiceManager != null) {
> 		timeServiceManager.advanceWatermark(mark);
> 	}
> 	output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these watermarks (e.g., join or aggregate results) may be regarded as delayed by the downstream operators since their timestamps must be less than or equal to the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back watermarks, to current operators. These watermarks should be blocked and stored by the operators until all the corresponding new generated results are emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)