You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (JIRA)" <ji...@apache.org> on 2019/02/23 16:27:00 UTC

[jira] [Commented] (FLINK-11286) Support to send StreamStatus.IDLE for non-source operators

    [ https://issues.apache.org/jira/browse/FLINK-11286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775928#comment-16775928 ] 

Jark Wu commented on FLINK-11286:
---------------------------------

Hi [~aljoscha], [~tzulitai], [~yanghua], I want to push this discussion a bit forward. We have also seen the same requirement.

In general, I agree with [~yanghua]'s opinion that usually the source logic and watermark logic are decoupled. Because the event timestamp may be existed in a complex data field (e.g. nested json, unstructured log), we have to extract the event time using a json-function/regex/udf which can only be applied AFTER the source.  IMO, that's 

We have suffered a similar problem, that one of the source partitions may have no more records in a period of time (e.g. IoT), which causes the window aggregate blocked. As mentioned above, the source logic and watermark logic are decoupled, we can't use {{markAsTemporarilyIdle}} to fix this. 

I dig into the source code and find a workaround: implementing a user-defined StreamOperator and use the {{getContainingTask().getStreamStatusMaintainer()}} to mark idle. I'm not sure if it's a good way but it works.

However, I think it's a common case for Flink users, and maybe we should propose a standard API to do that. 

> Support to send StreamStatus.IDLE for non-source operators 
> -----------------------------------------------------------
>
>                 Key: FLINK-11286
>                 URL: https://issues.apache.org/jira/browse/FLINK-11286
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: vinoyang
>            Assignee: vinoyang
>            Priority: Major
>
> Currently, only stream source tasks can be marked as temporary idle. But many times, this approach has limitations.
> Considering such a scenario, there is a DAG as follows: {{source->map->filter->flatmap->keyBy->window}}, with a degree of parallelism of 10. Among them, the watermark is not sent by the source operator, but is downstream, such as flatmap. Every source subtask will not be idle. However, after the filter, some pipelines generate "idle". For example, there are 3 pipelines that will no longer have data sent downstream. At this time, we can't call the {{markAsTemporarilyIdle}} method to mark the current pipeline in the idle state. This will affect the downstream window.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)