You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yassin Marzouki <ya...@gmail.com> on 2016/08/11 12:29:22 UTC

Strange behaviour of the flatMap Collector

Hi all,

When I use out.collect() twice inside a faltMap, the output is sometimes
and randomly skewed. Take this example:

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
    env.generateSequence(1, 100000)
        .flatMap((Long t, Collector<String> out) -> {
            out.collect("line1");
            out.collect("line2");
        })

.writeAsText("test",FileSystem.WriteMode.OVERWRITE).setParallelism(1);
env.execute("Test");

I expect the output to be
line1
line2
line1
line2
...

But some resulting lines (18 out of 200000) were:
line2
line2
and the same for line1.

What could be the reason for this?

Best,
Yassine

Re: Strange behaviour of the flatMap Collector

Posted by Yassin Marzouki <ya...@gmail.com>.
Indeed, using the same parallelism corrected the output. Thank you!

On Thu, Aug 11, 2016 at 2:34 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> The source runs parallel (n tasks), but the sink has a parallelism of 1.
> The sink hence has to merge the parallel streams from the source, which
> happens based on arrival speed of the streams, i.e., its not deterministic.
> That's why you see the lines being mixed.
>
> Try running source and sink with the same parallelism, then no merge of
> streams needs to happen. You'll see then that per output file, the lines
> are correct.
>
> Stephan
>
>
> On Thu, Aug 11, 2016 at 2:29 PM, Yassin Marzouki <ya...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> When I use out.collect() twice inside a faltMap, the output is sometimes
>> and randomly skewed. Take this example:
>>
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.cre
>> ateLocalEnvironment();
>>     env.generateSequence(1, 100000)
>>         .flatMap((Long t, Collector<String> out) -> {
>>             out.collect("line1");
>>             out.collect("line2");
>>         })
>>         .writeAsText("test",FileSystem.WriteMode.OVERWRITE).
>> setParallelism(1);
>> env.execute("Test");
>>
>> I expect the output to be
>> line1
>> line2
>> line1
>> line2
>> ...
>>
>> But some resulting lines (18 out of 200000) were:
>> line2
>> line2
>> and the same for line1.
>>
>> What could be the reason for this?
>>
>> Best,
>> Yassine
>>
>
>

Re: Strange behaviour of the flatMap Collector

Posted by Stephan Ewen <se...@apache.org>.
Hi!

The source runs parallel (n tasks), but the sink has a parallelism of 1.
The sink hence has to merge the parallel streams from the source, which
happens based on arrival speed of the streams, i.e., its not deterministic.
That's why you see the lines being mixed.

Try running source and sink with the same parallelism, then no merge of
streams needs to happen. You'll see then that per output file, the lines
are correct.

Stephan


On Thu, Aug 11, 2016 at 2:29 PM, Yassin Marzouki <ya...@gmail.com>
wrote:

> Hi all,
>
> When I use out.collect() twice inside a faltMap, the output is sometimes
> and randomly skewed. Take this example:
>
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> createLocalEnvironment();
>     env.generateSequence(1, 100000)
>         .flatMap((Long t, Collector<String> out) -> {
>             out.collect("line1");
>             out.collect("line2");
>         })
>         .writeAsText("test",FileSystem.WriteMode.
> OVERWRITE).setParallelism(1);
> env.execute("Test");
>
> I expect the output to be
> line1
> line2
> line1
> line2
> ...
>
> But some resulting lines (18 out of 200000) were:
> line2
> line2
> and the same for line1.
>
> What could be the reason for this?
>
> Best,
> Yassine
>