You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "haishui (Jira)" <ji...@apache.org> on 2023/04/14 08:29:00 UTC

[jira] [Commented] (FLINK-31632) watermark aligned idle source can't resume

    [ https://issues.apache.org/jira/browse/FLINK-31632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17712261#comment-17712261 ] 

haishui commented on FLINK-31632:
---------------------------------

Hi there. The PR may be ready to merge. When can it get into 1.15 1.16 and 1.17 branch? It's the first time for me to report and fix a bug, and I don't know what next to do.

> watermark aligned idle source can't resume
> ------------------------------------------
>
>                 Key: FLINK-31632
>                 URL: https://issues.apache.org/jira/browse/FLINK-31632
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.17.0, 1.16.1, 1.15.4
>            Reporter: haishui
>            Assignee: haishui
>            Priority: Critical
>              Labels: pull-request-available
>
>  
> {code:java}
> WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
>         .<String>forBoundedOutOfOrderness(Duration.ofMillis(0))
>         .withTimestampAssigner((element, recordTimestamp) -> Long.parseLong(element))
>         .withWatermarkAlignment("group", Duration.ofMillis(10), Duration.ofSeconds(2))
>         .withIdleness(Duration.ofSeconds(10)); 
> DataStreamSource<String> s1 = env.fromSource(kafkaSource("topic1"), watermarkStrategy, "S1");
> DataStreamSource<String> s2 = env.fromSource(kafkaSource("topic2"), watermarkStrategy, "S2");{code}
> send "0" to kafka topic1 and topic2
>  
> After 10s, source1 and source2 is idle,and logs are
>  
> {code:java}
> 09:44:30,403 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0
> 09:44:30,404 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0
> 09:44:32,019 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=9 to subTaskIds=[0]
> 09:44:32,019 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=9 to subTaskIds=[0]
> 09:44:32,417 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0
> 09:44:32,418 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0
> 09:44:34,028 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=9 to subTaskIds=[0]
> 09:44:34,028 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=9 to subTaskIds=[0]
> 09:44:34,423 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 15:12:55.807) from subTaskId=0
> 09:44:34,424 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 15:12:55.807) from subTaskId=0
> 09:44:36,023 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=-9223372036854775799 to subTaskIds=[0]
> 09:44:36,023 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=-9223372036854775799 to subTaskIds=[0]
> 09:44:36,433 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 15:12:55.807) from subTaskId=0
> 09:44:36,433 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 15:12:55.807) from subTaskId=0 {code}
> send message to topic1 or topic2 now, the message can't be consumed。
>  
> the reason is: 
> when a source is marked idle, the lastEmittedWatermark = Long.MAX_VALUE and 
> currentMaxDesiredWatermark = Long.MAX_VALUE + maxAllowedWatermarkDrift in org.apache.flink.streaming.api.operators.SourceOperator.
> currentMaxDesiredWatermark is negative and always less than lastEmittedWatermark
> operatingMode always is WAITING_FOR_ALIGNMENT



--
This message was sent by Atlassian Jira
(v8.20.10#820010)