You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Ziv Meri (JIRA)" <ji...@apache.org> on 2017/07/11 13:31:00 UTC
[jira] [Comment Edited] (FLINK-5031) Consecutive DataStream.split()
ignored
[ https://issues.apache.org/jira/browse/FLINK-5031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082196#comment-16082196 ]
Ziv Meri edited comment on FLINK-5031 at 7/11/17 1:30 PM:
----------------------------------------------------------
[~aljoscha] So I checked the functionallity of Side Outputs and it works great for me.
So I don't know if it's gone give solution to all the cases where split/select is needed but for my case it does the work.
was (Author: zivmeri):
[~aljoscha] So I checked the functionallity of Side Outputs and it works great for me.
So I don't know if it's gove give solution to all the cases where split/select is needed but for my case it does the work.
> Consecutive DataStream.split() ignored
> --------------------------------------
>
> Key: FLINK-5031
> URL: https://issues.apache.org/jira/browse/FLINK-5031
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.2.0, 1.1.3
> Reporter: Fabian Hueske
> Assignee: Renkai Ge
>
> The output of the following program
> {code}
> static final class ThresholdSelector implements OutputSelector<Long> {
> long threshold;
> public ThresholdSelector(long threshold) {
> this.threshold = threshold;
> }
> @Override
> public Iterable<String> select(Long value) {
> if (value < threshold) {
> return Collections.singletonList("Less");
> } else {
> return Collections.singletonList("GreaterEqual");
> }
> }
> }
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> SplitStream<Long> split1 = env.generateSequence(1, 11)
> .split(new ThresholdSelector(6));
> // stream11 should be [1,2,3,4,5]
> DataStream<Long> stream11 = split1.select("Less");
> SplitStream<Long> split2 = stream11
> // .map(new MapFunction<Long, Long>() {
> // @Override
> // public Long map(Long value) throws Exception {
> // return value;
> // }
> // })
> .split(new ThresholdSelector(3));
> DataStream<Long> stream21 = split2.select("Less");
> // stream21 should be [1,2]
> stream21.print();
> env.execute();
> }
> {code}
> should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second {{split}} operation is ignored.
> The program is correctly evaluate if the identity {{MapFunction}} is added to the program.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)