You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Danish Amjad (Jira)" <ji...@apache.org> on 2020/05/09 03:07:00 UTC

[jira] [Comment Edited] (FLINK-17578) Union of 2 SideOutputs behaviour incorrect

    [ https://issues.apache.org/jira/browse/FLINK-17578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17103074#comment-17103074 ] 

Danish Amjad edited comment on FLINK-17578 at 5/9/20, 3:06 AM:
---------------------------------------------------------------

The *root cause* of this problem lies in the implementation of [StreamEdge|[https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java]].

The hashcode and equals only check *edgeId* for comparison but they don't check *outputTag.* 
 The *edgeId* is formed by concating information such as sourceVertex, targetVertex etc. This information is the same for both of both even and odd side outputs in Union operation.

This has an effect in *OperatorChain* construction where a *Map* is maintained for edge/writer pair. The first one is always overwritten by the second. Later this Map is used for construction of *allOutputs* list that contains the same writer twice which causes the same list to be printed twice.

I have tested my analysis and it is working fine. Here is my code:
[https://github.com/damjad/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java]

Please suggest if there are any more changes (maybe a more sophisticated hashCode?)


was (Author: damjad):
The *root cause* of this problem lies in the implementation of [StreamEdge|[https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java]].

The hashcode and equals only check *edgeId* for comparison but they don't check *outputTag.* 
 The *edgeId* is formed by concating information such as sourceVertex, targetVertex etc. This information is the same for both of both even and odd side outputs in Union operation.

This has an effect in *OperatorChain* construction where a *Map* is maintained for edge/writer pair. The first one is always overwritten by the second. Later this Map is used for construction of *allOutputs* list that contains the same writer twice which causes the same list to be printed twice.

I have tested my analysis and it is working fine. Here is my code:
[https://github.com/damjad/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java

]Please suggest if there are any more changes (maybe a more sophisticated hashCode?)

> Union of 2 SideOutputs behaviour incorrect
> ------------------------------------------
>
>                 Key: FLINK-17578
>                 URL: https://issues.apache.org/jira/browse/FLINK-17578
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.10.0
>            Reporter: Tom Wells
>            Priority: Major
>
> Strange behaviour when using union() to merge outputs of 2 DataStreams, where both are sourced from SideOutputs.
> See example code with comments demonstrating the issue:
> {code:java}
>   def main(args: Array[String]): Unit = {
>     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
>     val input = env.fromElements(1, 2, 3, 4)
>     val oddTag = OutputTag[Int]("odds")
>     val evenTag = OutputTag[Int]("even")
>     val all =
>       input.process {
>         (value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]) => {
>           if (value % 2 != 0)
>             ctx.output(oddTag, value)
>           else
>             ctx.output(evenTag, value)
>         }
>       }
>     val odds = all.getSideOutput(oddTag)
>     val evens = all.getSideOutput(evenTag)
>     // These print correctly
>     //
>     odds.print                  // -> 1, 3
>     evens.print                 // -> 2, 4
>     // This prints incorrectly - BUG?
>     //
>     odds.union(evens).print       // -> 2, 2, 4, 4
>     evens.union(odds).print       // -> 1, 1, 3, 3
>     // Another test to understand normal behaviour of .union, using normal inputs
>     //
>     val odds1 = env.fromElements(1, 3)
>     val evens1 = env.fromElements(2, 4)
>     // Union of 2 normal inputs
>     //
>     odds1.union(evens1).print   // -> 1, 2, 3, 4
>     // Union of a normal input plus an input from a sideoutput
>     //
>     odds.union(evens1).print    // -> 1, 2, 3, 4
>     evens1.union(odds).print    // -> 1, 2, 3, 4
>     //
>     // So it seems that when both inputs are from sideoutputs that it behaves incorrectly... BUG?
>     env.execute("Test job")
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)