You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "liupengcheng (Jira)" <ji...@apache.org> on 2020/08/06 04:08:00 UTC

[jira] [Updated] (FLINK-18830) JoinCoGroupFunction and FlatJoinCoGroupFunction work incorrectly for outer join when one side of coGroup is empty

     [ https://issues.apache.org/jira/browse/FLINK-18830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

liupengcheng updated FLINK-18830:
---------------------------------
    Description: 
Currently, The `JoinCoGroupFunction` and `FlatJoinCoGroupFunction` in JoinedStreams does't respect the join type, it's been implemented as doing join within a two-level loop. However, this is incorrect for outer join when one side of the coGroup is empty.

```
	public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
			for (T1 val1: first) {
				for (T2 val2: second) {
					wrappedFunction.join(val1, val2, out);
				}
			}
		}
```
The above code is the current implementation, suppose the first input is non-empty, and the second input is an empty iterator, then the join function(`wrappedFunction`) will never be called. This will cause no data to be emitted for a left outer join.

So I propose to consider join type here, and handle this case, e.g., for left outer join, we can emit record with right side set to null here if the right side is empty or can not find any match in the right side.

  was:
Currently, The `JoinCoGroupFunction` and `FlatJoinCoGroupFunction` in JoinedStreams does't respect the join type, it's been implemented as doing join within a two-level loop. However, this is incorrect for outer join when one side of the coGroup is empty.

```
	public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
			for (T1 val1: first) {
				for (T2 val2: second) {
					wrappedFunction.join(val1, val2, out);
				}
			}
		}
```
The above code is the current implementation, suppose the first input is non-empty, and the second input is an empty iterator, then the join function(`wrappedFunction`) will never be called. This will cause no data to be emitted for a left outer join.

So I propose to consider join type here, and handle this case, e.g., for left outer join, we can call join function with right side set to null here if the right side is empty, so that the join function can handle real joins.


> JoinCoGroupFunction and FlatJoinCoGroupFunction work incorrectly for outer join when one side of coGroup is empty
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18830
>                 URL: https://issues.apache.org/jira/browse/FLINK-18830
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.11.1
>            Reporter: liupengcheng
>            Priority: Major
>
> Currently, The `JoinCoGroupFunction` and `FlatJoinCoGroupFunction` in JoinedStreams does't respect the join type, it's been implemented as doing join within a two-level loop. However, this is incorrect for outer join when one side of the coGroup is empty.
> ```
> 	public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
> 			for (T1 val1: first) {
> 				for (T2 val2: second) {
> 					wrappedFunction.join(val1, val2, out);
> 				}
> 			}
> 		}
> ```
> The above code is the current implementation, suppose the first input is non-empty, and the second input is an empty iterator, then the join function(`wrappedFunction`) will never be called. This will cause no data to be emitted for a left outer join.
> So I propose to consider join type here, and handle this case, e.g., for left outer join, we can emit record with right side set to null here if the right side is empty or can not find any match in the right side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)