You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Xingcan Cui (JIRA)" <ji...@apache.org> on 2017/08/04 09:35:00 UTC

[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks

    [ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16114167#comment-16114167 ] 

Xingcan Cui commented on FLINK-7245:
------------------------------------

Hi all, I'd like to throw out some basic ideas about the design.

# To support holding back watermarks, I plan to cache all the received watermarks as a priority queue in the {{InternalTimeServiceManager}} and expose some methods needed (e.g.,  the {{peek()}} and {{poll()}}).
# For the {{advanceWatermark()}} method in {{InternalTimeServiceManager}}, I think we can add a boolean parameter to indicate whether the watermark should be cached.
# A {{triggerWatermark()}} method, which can contain a default emitting mechanism (i.e., remove some watermarks from the cache and emit them) or be (partially) user-defined in the future, should be added to a new {{WatermarkPostponableOperator}}.
# Now the {{processWatermark()}} method in {{AbstractStreamOperator}} can be overridden in the {{WatermarkPostponableOperator}}.
# The watermarks can be snapshotted and restored with the {{snapshotStateForKeyGroup()}} and {{restoreStateForKeyGroup()}} methods in {{InternalTimeServiceManager}}.

There's a question. For an operator with two inputs, the current {{AbstractStreamOperator}} deals with their watermarks by merging them in advance, i.e., 
{code:java}
    public void processWatermark1(Watermark mark) throws Exception {
        input1Watermark = mark.getTimestamp();
        long newMin = Math.min(input1Watermark, input2Watermark);
        if (newMin > combinedWatermark) {
            combinedWatermark = newMin;
            processWatermark(new Watermark(combinedWatermark));
        }
    }

    public void processWatermark2(Watermark mark) throws Exception {
        input2Watermark = mark.getTimestamp();
        long newMin = Math.min(input1Watermark, input2Watermark);
        if (newMin > combinedWatermark) {
            combinedWatermark = newMin;
            processWatermark(new Watermark(combinedWatermark));
        }
    }
{code}
I'm not sure if we should add two separate queues for them or just keep the current mechanism.

What do you think? [~fhueske], [~aljoscha], and [~jark].

Best, Xingcan

> Enhance the operators to support holding back watermarks
> --------------------------------------------------------
>
>                 Key: FLINK-7245
>                 URL: https://issues.apache.org/jira/browse/FLINK-7245
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Xingcan Cui
>            Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
> 	if (timeServiceManager != null) {
> 		timeServiceManager.advanceWatermark(mark);
> 	}
> 	output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these watermarks (e.g., join or aggregate results) may be regarded as delayed by the downstream operators since their timestamps must be less than or equal to the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back watermarks, to current operators. These watermarks should be blocked and stored by the operators until all the corresponding new generated results are emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)