You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Tzu-Li (Gordon) Tai (JIRA)" <ji...@apache.org> on 2018/01/22 18:12:01 UTC

[jira] [Comment Edited] (FLINK-5018) Make source idle timeout user configurable

    [ https://issues.apache.org/jira/browse/FLINK-5018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334655#comment-16334655 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5018 at 1/22/18 6:11 PM:
---------------------------------------------------------------------

[~eronwright]

Yes, you are right. For watermark-aware sources, they should also implement idleness timeout within the source implementation.
 For example, FLINK-5479 targets exactly this for the {{FlinkKafkaConsumer}}.

I haven't fleshed out the details yet, but as far as I can tell, this JIRA is aiming for generic support on setting an idle timeout on sources as so (API is still TBD):
{code:java}
// if the source emits nothing for a specified timeout, forward the IDLE marker to release its hold on event-time advancement downstream.
DataStream<String> sourceStream1 = env.addSource(...).setIdleTimeout(1000L);
DataStream<String> sourceStream2 = env.addSource(...).setIdleTimeout(300L);
{code}
I think your concern is valid. For example, consider the following:
{code:java}
FlinkKafkaConsumer kafkaConsumer = ...
kafkaConsumer.assignTimestampsAndWatermarks(...); // per-partition watermarks

DataStream<String> sourceStream1 = env.addSource(...).setIdleTimeout(1000L); // generic idle timeout configuration
{code}
Is this the "unsupported combination" case that you are concerned with?

One thing we can consider is perhaps to always piggy-back that configuration for watermark-aware sources. What do you think?


was (Author: tzulitai):
[~eronwright]

Yes, you are right. For watermark-aware sources, they should also implement idleness timeout within the source implementation.
For example, FLINK-5479 targets exactly this for the {{FlinkKafkaConsumer}}.

I haven't fleshed out the details yet, but as far as I can tell, this JIRA is aiming for generic support on setting an idle timeout on sources as so (API is still TBD):
{code}
// if the source emits nothing for a specified timeout, forward the IDLE marker to release its hold on event-time advancement downstream.
DataStream<String> sourceStream1 = env.addSource(...).setIdleTimeout(1000L);
DataStream<String> sourceStream2 = env.addSource(...).setIdleTimeout(300L);
{code}

I think your concern is valid. For example, consider the following:
{code}
FlinkKafkaConsumer kafkaConsumer = ...
kafkaConsumer.assignTimestampsAndWatermarks(...); // per-partition watermarks

DataStream<String> sourceStream1 = env.addSource(...).setIdleTimeout(1000L); // generic idle timeout configuration
{code}
Is this the "unsupported combination" case that you are concerned with?

One thing we can consider is perhaps to always piggy-back that configuration for watermark-aware sources.

> Make source idle timeout user configurable
> ------------------------------------------
>
>                 Key: FLINK-5018
>                 URL: https://issues.apache.org/jira/browse/FLINK-5018
>             Project: Flink
>          Issue Type: Sub-task
>          Components: DataStream API
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Major
>             Fix For: 1.5.0
>
>
> There are 2 cases where sources are considered idle and should emit an idle {{StreamStatus}} downstream, taking Kafka consumer as example:
> - The source instance was not assigned any partitions
> - The source instance was assigned partitions, but they currently don't have any data.
> For the second case, we can only consider it idle after a timeout threshold. It would be good to make this timeout user configurable besides a default value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)