You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Piotr Nowojski (Jira)" <ji...@apache.org> on 2022/01/20 10:32:00 UTC

[jira] [Commented] (FLINK-25199) StreamEdges are not unique in self-union blocking propagation of watermarks

    [ https://issues.apache.org/jira/browse/FLINK-25199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479244#comment-17479244 ] 

Piotr Nowojski commented on FLINK-25199:
----------------------------------------

The problem was quite strange. If there was a node that was self-unioned with itself, it was creating a situation with two identical StreamEdges. Both with the same partitioning, from the same source node to the same target node. 

This was causing issues when constructing output collectors and picking the correct RecordWriters, as StreamTask was not able to uniquely identify given StreamEdge and was assigning the same RecordWriter to both of the edges. As a result all stream elements
were sent twice through the same RecordWriter. It was actually pretty harmless apart of calculating the combined watermark downstream, since all watermarks were always comming just from one single edge/inputgate, and the unused edges were always stuck with min watermark.

As a solution we are making sure that StreamEdges are unique by introducing a uniqueId field, incremented for every pair of StreamEdges connecting the same node.

> StreamEdges are not unique in self-union blocking propagation of watermarks
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-25199
>                 URL: https://issues.apache.org/jira/browse/FLINK-25199
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.14.3
>            Reporter: Timo Walther
>            Assignee: Piotr Nowojski
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> It seems {{fromValues}} that generates multiple rows does not emit any watermarks:
> {code}
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>         Table inputTable =
>                 tEnv.fromValues(
>                         DataTypes.ROW(
>                                 DataTypes.FIELD("weight", DataTypes.DOUBLE()),
>                                 DataTypes.FIELD("f0", DataTypes.STRING()),
>                                 DataTypes.FIELD("f1", DataTypes.DOUBLE()),
>                                 DataTypes.FIELD("f2", DataTypes.DOUBLE()),
>                                 DataTypes.FIELD("f3", DataTypes.DOUBLE()),
>                                 DataTypes.FIELD("f4", DataTypes.INT()),
>                                 DataTypes.FIELD("label", DataTypes.STRING())),
>                         Row.of(1., "a", 1., 1., 1., 2, "l1"),
>                         Row.of(1., "a", 1., 1., 1., 2, "l1"));
>         DataStream<Row> input = tEnv.toDataStream(inputTable);
> {code}
> {{fromValues(1, 2, 3)}} or {{fromValues}} with only 1 row works correctly.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)