You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dawid Wysakowicz (JIRA)" <ji...@apache.org> on 2018/12/13 14:12:00 UTC

[jira] [Comment Edited] (FLINK-11084) Incorrect ouput after two consecutive split and select

    [ https://issues.apache.org/jira/browse/FLINK-11084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16720207#comment-16720207 ] 

Dawid Wysakowicz edited comment on FLINK-11084 at 12/13/18 2:11 PM:
--------------------------------------------------------------------

Hi [~dangdangdang]
My feeling is that the {{split/select}} is very buggy feature and the general consensus is it should be removed and subsumed with side-outputs. Therefore I would not make it any more user-friendly ;) I would actually be in favor of throwing an exception. Just to support my claim I think this fix still does not solve a case when we apply split on top of union (and who knows how many more):

{code}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromElements("a1", "a2");
SplitStream<String> splitStream1 = dataStream.split(new OutputSelector<String>() {
	@Override public Iterable<String> select(String value) {
		List<String> output = new ArrayList<>(1);
		if (value.equals("a1")) {
			output.add("name1");
		} else {
			output.add("name2");
		}
		return output;
	}
});
DataStream<String> splitStream2 = splitStream1.select("name1").split(new OutputSelector<String>() {
	@Override public Iterable<String> select(String value) {
		List<String> output = new ArrayList<>(1);
		if (value.equals("a1")) {
			output.add("name4");
		} else {
			output.add("name5");
		}
		return output;
	}
}).select("name4");
DataStream<String> splitStream3 = splitStream1.select("name1").union(splitStream2)
	.split(new OutputSelector<String>() {
	@Override public Iterable<String> select(String value) {
		List<String> output = new ArrayList<>(1);
		if (value.equals("a1")) {
			output.add("name6");
		} else {
			output.add("name7");
		}
		return output;
	}
}).select("name7", "name6");
splitStream1.addSink(new SinkFunction<String>() {
	@Override
	public void invoke(String value, Context context) throws Exception {
		System.out.println("1> " + value);
	}
});
splitStream2.addSink(new SinkFunction<String>() {
	@Override
	public void invoke(String value, Context context) throws Exception {
		System.out.println("2> " + value);
	}
});
splitStream3.addSink(new SinkFunction<String>() {
	@Override
	public void invoke(String value, Context context) throws Exception {
		System.out.println("3> " + value);
	}
});
env.execute();
{code}

The result is :

{code}
//splitStream1 - correct
1> a1
1> a2

//splitStream2 - correct
2> a1

//splitStream3 - incorrect, there should only be a1
3> a1
3> a2
{code}


was (Author: dawidwys):
Hi [~dangdangdang]
My feeling is that the {{split/select}} is very buggy feature and the general consensus is it should be removed and subsumed with side-outputs. Therefore I would not make it any more user-friendly ;) I would actually be in favor of throwing an exception. Just to support my claim I think this fix still does not solve a case when we apply split on top of union:

{code}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromElements("a1", "a2");
SplitStream<String> splitStream1 = dataStream.split(new OutputSelector<String>() {
	@Override public Iterable<String> select(String value) {
		List<String> output = new ArrayList<>(1);
		if (value.equals("a1")) {
			output.add("name1");
		} else {
			output.add("name2");
		}
		return output;
	}
});
DataStream<String> splitStream2 = splitStream1.select("name1").split(new OutputSelector<String>() {
	@Override public Iterable<String> select(String value) {
		List<String> output = new ArrayList<>(1);
		if (value.equals("a1")) {
			output.add("name4");
		} else {
			output.add("name5");
		}
		return output;
	}
}).select("name4");
DataStream<String> splitStream3 = splitStream1.select("name1").union(splitStream2)
	.split(new OutputSelector<String>() {
	@Override public Iterable<String> select(String value) {
		List<String> output = new ArrayList<>(1);
		if (value.equals("a1")) {
			output.add("name6");
		} else {
			output.add("name7");
		}
		return output;
	}
}).select("name7", "name6");
splitStream1.addSink(new SinkFunction<String>() {
	@Override
	public void invoke(String value, Context context) throws Exception {
		System.out.println("1> " + value);
	}
});
splitStream2.addSink(new SinkFunction<String>() {
	@Override
	public void invoke(String value, Context context) throws Exception {
		System.out.println("2> " + value);
	}
});
splitStream3.addSink(new SinkFunction<String>() {
	@Override
	public void invoke(String value, Context context) throws Exception {
		System.out.println("3> " + value);
	}
});
env.execute();
{code}

The result is :

{code}
//splitStream1 - correct
1> a1
1> a2

//splitStream2 - correct
2> a1

//splitStream3 - incorrect, there should only be a1
3> a1
3> a2
{code}

> Incorrect ouput after two consecutive split and select
> ------------------------------------------------------
>
>                 Key: FLINK-11084
>                 URL: https://issues.apache.org/jira/browse/FLINK-11084
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Shimin Yang
>            Assignee: Shimin Yang
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.2
>
>
> The second OutputSelector of two successive split and select are actually not rely on the first one. They are in the same array of OutputSelector in DirectedOutput.
> For example.
> outputSelector1 => \{“name1” or ”name2“}
> outputSelector2 => \{”name3“ or “name4”}
> resultStream = dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3")
> expectedResult for input \{StreamRecord1}:
> outputSelector1 => \{”name1“}
> outputSelector2 => \{”name3“}
> resultStream => {}
> actualResult:
> resultStream => \{StreamRecord1}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)