You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dawid Wysakowicz (JIRA)" <ji...@apache.org> on 2018/12/13 14:12:00 UTC
[jira] [Comment Edited] (FLINK-11084) Incorrect ouput after two
consecutive split and select
[ https://issues.apache.org/jira/browse/FLINK-11084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16720207#comment-16720207 ]
Dawid Wysakowicz edited comment on FLINK-11084 at 12/13/18 2:11 PM:
--------------------------------------------------------------------
Hi [~dangdangdang]
My feeling is that the {{split/select}} is very buggy feature and the general consensus is it should be removed and subsumed with side-outputs. Therefore I would not make it any more user-friendly ;) I would actually be in favor of throwing an exception. Just to support my claim I think this fix still does not solve a case when we apply split on top of union (and who knows how many more):
{code}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromElements("a1", "a2");
SplitStream<String> splitStream1 = dataStream.split(new OutputSelector<String>() {
@Override public Iterable<String> select(String value) {
List<String> output = new ArrayList<>(1);
if (value.equals("a1")) {
output.add("name1");
} else {
output.add("name2");
}
return output;
}
});
DataStream<String> splitStream2 = splitStream1.select("name1").split(new OutputSelector<String>() {
@Override public Iterable<String> select(String value) {
List<String> output = new ArrayList<>(1);
if (value.equals("a1")) {
output.add("name4");
} else {
output.add("name5");
}
return output;
}
}).select("name4");
DataStream<String> splitStream3 = splitStream1.select("name1").union(splitStream2)
.split(new OutputSelector<String>() {
@Override public Iterable<String> select(String value) {
List<String> output = new ArrayList<>(1);
if (value.equals("a1")) {
output.add("name6");
} else {
output.add("name7");
}
return output;
}
}).select("name7", "name6");
splitStream1.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println("1> " + value);
}
});
splitStream2.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println("2> " + value);
}
});
splitStream3.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println("3> " + value);
}
});
env.execute();
{code}
The result is :
{code}
//splitStream1 - correct
1> a1
1> a2
//splitStream2 - correct
2> a1
//splitStream3 - incorrect, there should only be a1
3> a1
3> a2
{code}
was (Author: dawidwys):
Hi [~dangdangdang]
My feeling is that the {{split/select}} is very buggy feature and the general consensus is it should be removed and subsumed with side-outputs. Therefore I would not make it any more user-friendly ;) I would actually be in favor of throwing an exception. Just to support my claim I think this fix still does not solve a case when we apply split on top of union:
{code}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromElements("a1", "a2");
SplitStream<String> splitStream1 = dataStream.split(new OutputSelector<String>() {
@Override public Iterable<String> select(String value) {
List<String> output = new ArrayList<>(1);
if (value.equals("a1")) {
output.add("name1");
} else {
output.add("name2");
}
return output;
}
});
DataStream<String> splitStream2 = splitStream1.select("name1").split(new OutputSelector<String>() {
@Override public Iterable<String> select(String value) {
List<String> output = new ArrayList<>(1);
if (value.equals("a1")) {
output.add("name4");
} else {
output.add("name5");
}
return output;
}
}).select("name4");
DataStream<String> splitStream3 = splitStream1.select("name1").union(splitStream2)
.split(new OutputSelector<String>() {
@Override public Iterable<String> select(String value) {
List<String> output = new ArrayList<>(1);
if (value.equals("a1")) {
output.add("name6");
} else {
output.add("name7");
}
return output;
}
}).select("name7", "name6");
splitStream1.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println("1> " + value);
}
});
splitStream2.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println("2> " + value);
}
});
splitStream3.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println("3> " + value);
}
});
env.execute();
{code}
The result is :
{code}
//splitStream1 - correct
1> a1
1> a2
//splitStream2 - correct
2> a1
//splitStream3 - incorrect, there should only be a1
3> a1
3> a2
{code}
> Incorrect ouput after two consecutive split and select
> ------------------------------------------------------
>
> Key: FLINK-11084
> URL: https://issues.apache.org/jira/browse/FLINK-11084
> Project: Flink
> Issue Type: Bug
> Components: Streaming
> Reporter: Shimin Yang
> Assignee: Shimin Yang
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.2
>
>
> The second OutputSelector of two successive split and select are actually not rely on the first one. They are in the same array of OutputSelector in DirectedOutput.
> For example.
> outputSelector1 => \{“name1” or ”name2“}
> outputSelector2 => \{”name3“ or “name4”}
> resultStream = dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3")
> expectedResult for input \{StreamRecord1}:
> outputSelector1 => \{”name1“}
> outputSelector2 => \{”name3“}
> resultStream => {}
> actualResult:
> resultStream => \{StreamRecord1}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)