You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Rui Fan (Jira)" <ji...@apache.org> on 2023/06/22 14:48:00 UTC

[jira] [Created] (FLINK-32414) Watermark alignment will cause flink jobs to hang forever when any source subtask has no SourceSplit

Rui Fan created FLINK-32414:
-------------------------------

             Summary: Watermark alignment will cause flink jobs to hang forever when any source subtask has no SourceSplit
                 Key: FLINK-32414
                 URL: https://issues.apache.org/jira/browse/FLINK-32414
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.17.1, 1.16.2
            Reporter: Rui Fan
            Assignee: Rui Fan
         Attachments: image-2023-06-22-22-43-59-671.png

Watermark alignment will cause flink jobs to hang forever when any source subtask has no SourceSplit.
h1. Root cause:
 # [SourceOperator#emitLatestWatermark|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L504] reports the lastEmittedWatermark to SourceCoordinator
 # If one subtask has no SourceSplit, the lastEmittedWatermark will be the [Watermark.UNINITIALIZED.getTimestamp()|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L149] forever, it's Long.MIN_VALUE.
 # SourceCoordinator combines the watermark of all subtasks, and using the [minimum watermark|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644] as the aggregated watermark.
 # Long.MIN_VALUE must be the minimum watermark, so the maxAllowedWatermark =  Long.MIN_VALUE + maxAllowedWatermarkDrift, and [SourceCoordinator will announce it to all subtasks.|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L168]
 # The maxAllowedWatermark is very small, so all source subtasks will hang forever

h1. How to reproduce?

When the kafka partition number is less than the parallelism of kafka source.

Here is a demo: [code link|https://github.com/1996fanrui/fanrui-learning/commit/24b707f7805b3a61a70df1c70c26f8e8a16b006b]
 * kafka partition is 1
 * The paralleslism is 2

 

!image-2023-06-22-22-43-59-671.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)