You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kostya Kulagin <kk...@gmail.com> on 2016/04/20 23:52:52 UTC

Values are missing, probably due parallelism?

I think it has smth to do with parallelism and I probably do not have clear
understanding how parallelism works in flink but in this example:

    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

      @Override
      public void run(SourceContext<Long> ctx) throws Exception {
        LongStream.range(0, 29).forEach(ctx::collect);
      }

      @Override
      public void cancel() {

      }
    });

    source.countWindowAll(10).apply(new AllWindowFunction<Long, Long,
GlobalWindow>() {
      @Override
      public void apply(GlobalWindow window, Iterable<Long> values,
Collector<Long> out) throws Exception {
        for (Long value : values) {
          if (value % 3 == 0) {
            out.collect(value);
          }
        }
      }
    }).print();

    env.execute("yoyoyo");

Why my output is like this:

4> 9
1> 0
1> 12
3> 6
3> 18
2> 3
2> 15

? I.e. where id s value of 24 for example? I expect to see it. What am I
doing wrong?

Re: Values are missing, probably due parallelism?

Posted by Kostya Kulagin <kk...@gmail.com>.
Thanks, so you were right and it is really connected to not-finishing
windows problem I've mentioned in the other post.
I don't really need parallelism of 1 for windows - I expect operation on
windows be pretty expensive and I like an idea that I can "parallelize" it.

Thanks for the explanation!

On Thu, Apr 21, 2016 at 8:06 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> no worries, I also had to read the doc to figure it out. :-)
>
> I now see what the problem is. The .countWindowAll().apply() pattern
> creates a WindowOperator with parallelism of 1 because the "count all" only
> works if one instance of the window operator sees all elements. When
> manually changing the parallelism it essentially becomes a "count per
> parallel instance" window operation and the elements form the source with
> parallelism 1 get distributed round-robin to the parallel instances of the
> count-window operator. This means, that it will take more elements emitted
> from the source before each instance of the window fires. It's a bit
> confusing but Flink does not allow forcing the parallelism to 1 right now.
>
> About using the snapshot version, I would suggest you don't use it if you
> don't absolutely need one of the features in there that is not yet
> released. The build are still pretty stable, however.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 13:53 Kostya Kulagin <kk...@gmail.com> wrote:
>
>> First of all you are right about number of elements, my bad and sorry for
>> the confusion, I need to be better in calculations :)
>>
>> However: if I change parallelism to. lets say 2 in windowing, i.e.
>> instead of (of course I changed 29 to 30 as well :) )
>>
>> }).print();
>>
>> put
>>
>> }).setParallelism(2).print();
>>
>> at the very bottom - I am getting:
>>
>> 3> 15
>> 3> 12
>> 2> 9
>> 2> 6
>> 4> 18
>> 04/21/2016 07:47:08	Sink: Unnamed(2/4) switched to FINISHED
>> 04/21/2016 07:47:08	Source: Custom Source(1/1) switched to FINISHED
>> 04/21/2016 07:47:08	Sink: Unnamed(4/4) switched to FINISHED
>> 04/21/2016 07:47:08	Sink: Unnamed(3/4) switched to FINISHED
>> 04/21/2016 07:47:08	TriggerWindow(GlobalWindows(), PurgingTrigger(CountTrigger(10)), AllWindowedStream.apply(AllWindowedStream.java:230))(2/2) switched to FINISHED
>> 04/21/2016 07:47:08	TriggerWindow(GlobalWindows(), PurgingTrigger(CountTrigger(10)), AllWindowedStream.apply(AllWindowedStream.java:230))(1/2) switched to FINISHED
>> 1> 3
>> 1> 0
>>
>> With default setting for parallelism it works fine, same as with value 3
>> and 1.
>>
>> With 2, 4+ it does not work. With 4+ it simply prints nothing. I.e. it
>> might be smth with how threads are finishing their execution?
>>
>> I am using the latest prod version I've found in maven: 1.0.1.
>> Can snapshot versions be used in prod? I mean how well tested are those?
>>
>> I will try the same on master branch later today.
>>
>> Thanks!
>> Kostya
>>
>>
>> On Thu, Apr 21, 2016 at 6:38 AM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>> which version of Flink are you using? Maybe there is a bug. I've tested
>>> it on the git master (1.1-SNAPSHOT) and it works fine with varying degrees
>>> of parallelism if I change the source to emit 30 elements:
>>> LongStream.range(0, 30).forEach(ctx::collect);
>>>
>>> (The second argument of LongStream.range(start, end) is exclusive)
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>>
>>>
>>> On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin <kk...@gmail.com> wrote:
>>>
>>>> Actually this is not true - the source emits 30 values since it is
>>>> started with 0. If I change 29 to 33 result will be the same.
>>>> I can get all values if I play with parallelism. I.e putting parallel 1
>>>> before print.
>>>> Or if I change 29 to 39 ( I have 4 cors)
>>>> I can guess that there is smth wrong with threads. BTW in this case how
>>>> threads are created and how data flows between?
>>>> On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" <al...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> this is related to your other question about count windows. The source
>>>>> emits 29 values so we only have two count-windows with 10 elements each.
>>>>> The last window is never triggered.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <kk...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I think it has smth to do with parallelism and I probably do not have
>>>>>> clear understanding how parallelism works in flink but in this example:
>>>>>>
>>>>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>>>>>>
>>>>>>       @Override
>>>>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>>>>         LongStream.range(0, 29).forEach(ctx::collect);
>>>>>>       }
>>>>>>
>>>>>>       @Override
>>>>>>       public void cancel() {
>>>>>>
>>>>>>       }
>>>>>>     });
>>>>>>
>>>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>>>>>>       @Override
>>>>>>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>>>>>>         for (Long value : values) {
>>>>>>           if (value % 3 == 0) {
>>>>>>             out.collect(value);
>>>>>>           }
>>>>>>         }
>>>>>>       }
>>>>>>     }).print();
>>>>>>
>>>>>>     env.execute("yoyoyo");
>>>>>>
>>>>>> Why my output is like this:
>>>>>>
>>>>>> 4> 9
>>>>>> 1> 0
>>>>>> 1> 12
>>>>>> 3> 6
>>>>>> 3> 18
>>>>>> 2> 3
>>>>>> 2> 15
>>>>>>
>>>>>> ? I.e. where id s value of 24 for example? I expect to see it. What
>>>>>> am I doing wrong?
>>>>>>
>>>>>
>>

Re: Values are missing, probably due parallelism?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
no worries, I also had to read the doc to figure it out. :-)

I now see what the problem is. The .countWindowAll().apply() pattern
creates a WindowOperator with parallelism of 1 because the "count all" only
works if one instance of the window operator sees all elements. When
manually changing the parallelism it essentially becomes a "count per
parallel instance" window operation and the elements form the source with
parallelism 1 get distributed round-robin to the parallel instances of the
count-window operator. This means, that it will take more elements emitted
from the source before each instance of the window fires. It's a bit
confusing but Flink does not allow forcing the parallelism to 1 right now.

About using the snapshot version, I would suggest you don't use it if you
don't absolutely need one of the features in there that is not yet
released. The build are still pretty stable, however.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 13:53 Kostya Kulagin <kk...@gmail.com> wrote:

> First of all you are right about number of elements, my bad and sorry for
> the confusion, I need to be better in calculations :)
>
> However: if I change parallelism to. lets say 2 in windowing, i.e. instead
> of (of course I changed 29 to 30 as well :) )
>
> }).print();
>
> put
>
> }).setParallelism(2).print();
>
> at the very bottom - I am getting:
>
> 3> 15
> 3> 12
> 2> 9
> 2> 6
> 4> 18
> 04/21/2016 07:47:08	Sink: Unnamed(2/4) switched to FINISHED
> 04/21/2016 07:47:08	Source: Custom Source(1/1) switched to FINISHED
> 04/21/2016 07:47:08	Sink: Unnamed(4/4) switched to FINISHED
> 04/21/2016 07:47:08	Sink: Unnamed(3/4) switched to FINISHED
> 04/21/2016 07:47:08	TriggerWindow(GlobalWindows(), PurgingTrigger(CountTrigger(10)), AllWindowedStream.apply(AllWindowedStream.java:230))(2/2) switched to FINISHED
> 04/21/2016 07:47:08	TriggerWindow(GlobalWindows(), PurgingTrigger(CountTrigger(10)), AllWindowedStream.apply(AllWindowedStream.java:230))(1/2) switched to FINISHED
> 1> 3
> 1> 0
>
> With default setting for parallelism it works fine, same as with value 3
> and 1.
>
> With 2, 4+ it does not work. With 4+ it simply prints nothing. I.e. it
> might be smth with how threads are finishing their execution?
>
> I am using the latest prod version I've found in maven: 1.0.1.
> Can snapshot versions be used in prod? I mean how well tested are those?
>
> I will try the same on master branch later today.
>
> Thanks!
> Kostya
>
>
> On Thu, Apr 21, 2016 at 6:38 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> which version of Flink are you using? Maybe there is a bug. I've tested
>> it on the git master (1.1-SNAPSHOT) and it works fine with varying degrees
>> of parallelism if I change the source to emit 30 elements:
>> LongStream.range(0, 30).forEach(ctx::collect);
>>
>> (The second argument of LongStream.range(start, end) is exclusive)
>>
>> Cheers,
>> Aljoscha
>>
>>
>>
>> On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin <kk...@gmail.com> wrote:
>>
>>> Actually this is not true - the source emits 30 values since it is
>>> started with 0. If I change 29 to 33 result will be the same.
>>> I can get all values if I play with parallelism. I.e putting parallel 1
>>> before print.
>>> Or if I change 29 to 39 ( I have 4 cors)
>>> I can guess that there is smth wrong with threads. BTW in this case how
>>> threads are created and how data flows between?
>>> On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" <al...@apache.org> wrote:
>>>
>>>> Hi,
>>>> this is related to your other question about count windows. The source
>>>> emits 29 values so we only have two count-windows with 10 elements each.
>>>> The last window is never triggered.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <kk...@gmail.com> wrote:
>>>>
>>>>> I think it has smth to do with parallelism and I probably do not have
>>>>> clear understanding how parallelism works in flink but in this example:
>>>>>
>>>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>>>>>
>>>>>       @Override
>>>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>>>         LongStream.range(0, 29).forEach(ctx::collect);
>>>>>       }
>>>>>
>>>>>       @Override
>>>>>       public void cancel() {
>>>>>
>>>>>       }
>>>>>     });
>>>>>
>>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>>>>>       @Override
>>>>>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>>>>>         for (Long value : values) {
>>>>>           if (value % 3 == 0) {
>>>>>             out.collect(value);
>>>>>           }
>>>>>         }
>>>>>       }
>>>>>     }).print();
>>>>>
>>>>>     env.execute("yoyoyo");
>>>>>
>>>>> Why my output is like this:
>>>>>
>>>>> 4> 9
>>>>> 1> 0
>>>>> 1> 12
>>>>> 3> 6
>>>>> 3> 18
>>>>> 2> 3
>>>>> 2> 15
>>>>>
>>>>> ? I.e. where id s value of 24 for example? I expect to see it. What am
>>>>> I doing wrong?
>>>>>
>>>>
>

Re: Values are missing, probably due parallelism?

Posted by Kostya Kulagin <kk...@gmail.com>.
First of all you are right about number of elements, my bad and sorry for
the confusion, I need to be better in calculations :)

However: if I change parallelism to. lets say 2 in windowing, i.e. instead
of (of course I changed 29 to 30 as well :) )

}).print();

put

}).setParallelism(2).print();

at the very bottom - I am getting:

3> 15
3> 12
2> 9
2> 6
4> 18
04/21/2016 07:47:08	Sink: Unnamed(2/4) switched to FINISHED
04/21/2016 07:47:08	Source: Custom Source(1/1) switched to FINISHED
04/21/2016 07:47:08	Sink: Unnamed(4/4) switched to FINISHED
04/21/2016 07:47:08	Sink: Unnamed(3/4) switched to FINISHED
04/21/2016 07:47:08	TriggerWindow(GlobalWindows(),
PurgingTrigger(CountTrigger(10)),
AllWindowedStream.apply(AllWindowedStream.java:230))(2/2) switched to
FINISHED
04/21/2016 07:47:08	TriggerWindow(GlobalWindows(),
PurgingTrigger(CountTrigger(10)),
AllWindowedStream.apply(AllWindowedStream.java:230))(1/2) switched to
FINISHED
1> 3
1> 0

With default setting for parallelism it works fine, same as with value 3
and 1.

With 2, 4+ it does not work. With 4+ it simply prints nothing. I.e. it
might be smth with how threads are finishing their execution?

I am using the latest prod version I've found in maven: 1.0.1.
Can snapshot versions be used in prod? I mean how well tested are those?

I will try the same on master branch later today.

Thanks!
Kostya


On Thu, Apr 21, 2016 at 6:38 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> which version of Flink are you using? Maybe there is a bug. I've tested it
> on the git master (1.1-SNAPSHOT) and it works fine with varying degrees of
> parallelism if I change the source to emit 30 elements:
> LongStream.range(0, 30).forEach(ctx::collect);
>
> (The second argument of LongStream.range(start, end) is exclusive)
>
> Cheers,
> Aljoscha
>
>
>
> On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin <kk...@gmail.com> wrote:
>
>> Actually this is not true - the source emits 30 values since it is
>> started with 0. If I change 29 to 33 result will be the same.
>> I can get all values if I play with parallelism. I.e putting parallel 1
>> before print.
>> Or if I change 29 to 39 ( I have 4 cors)
>> I can guess that there is smth wrong with threads. BTW in this case how
>> threads are created and how data flows between?
>> On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" <al...@apache.org> wrote:
>>
>>> Hi,
>>> this is related to your other question about count windows. The source
>>> emits 29 values so we only have two count-windows with 10 elements each.
>>> The last window is never triggered.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <kk...@gmail.com> wrote:
>>>
>>>> I think it has smth to do with parallelism and I probably do not have
>>>> clear understanding how parallelism works in flink but in this example:
>>>>
>>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>>>>
>>>>       @Override
>>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>>         LongStream.range(0, 29).forEach(ctx::collect);
>>>>       }
>>>>
>>>>       @Override
>>>>       public void cancel() {
>>>>
>>>>       }
>>>>     });
>>>>
>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>>>>       @Override
>>>>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>>>>         for (Long value : values) {
>>>>           if (value % 3 == 0) {
>>>>             out.collect(value);
>>>>           }
>>>>         }
>>>>       }
>>>>     }).print();
>>>>
>>>>     env.execute("yoyoyo");
>>>>
>>>> Why my output is like this:
>>>>
>>>> 4> 9
>>>> 1> 0
>>>> 1> 12
>>>> 3> 6
>>>> 3> 18
>>>> 2> 3
>>>> 2> 15
>>>>
>>>> ? I.e. where id s value of 24 for example? I expect to see it. What am
>>>> I doing wrong?
>>>>
>>>

Re: Values are missing, probably due parallelism?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
which version of Flink are you using? Maybe there is a bug. I've tested it
on the git master (1.1-SNAPSHOT) and it works fine with varying degrees of
parallelism if I change the source to emit 30 elements:
LongStream.range(0, 30).forEach(ctx::collect);

(The second argument of LongStream.range(start, end) is exclusive)

Cheers,
Aljoscha



On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin <kk...@gmail.com> wrote:

> Actually this is not true - the source emits 30 values since it is started
> with 0. If I change 29 to 33 result will be the same.
> I can get all values if I play with parallelism. I.e putting parallel 1
> before print.
> Or if I change 29 to 39 ( I have 4 cors)
> I can guess that there is smth wrong with threads. BTW in this case how
> threads are created and how data flows between?
> On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" <al...@apache.org> wrote:
>
>> Hi,
>> this is related to your other question about count windows. The source
>> emits 29 values so we only have two count-windows with 10 elements each.
>> The last window is never triggered.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <kk...@gmail.com> wrote:
>>
>>> I think it has smth to do with parallelism and I probably do not have
>>> clear understanding how parallelism works in flink but in this example:
>>>
>>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>>>
>>>       @Override
>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>         LongStream.range(0, 29).forEach(ctx::collect);
>>>       }
>>>
>>>       @Override
>>>       public void cancel() {
>>>
>>>       }
>>>     });
>>>
>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>>>       @Override
>>>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>>>         for (Long value : values) {
>>>           if (value % 3 == 0) {
>>>             out.collect(value);
>>>           }
>>>         }
>>>       }
>>>     }).print();
>>>
>>>     env.execute("yoyoyo");
>>>
>>> Why my output is like this:
>>>
>>> 4> 9
>>> 1> 0
>>> 1> 12
>>> 3> 6
>>> 3> 18
>>> 2> 3
>>> 2> 15
>>>
>>> ? I.e. where id s value of 24 for example? I expect to see it. What am I
>>> doing wrong?
>>>
>>

Re: Values are missing, probably due parallelism?

Posted by Kostya Kulagin <kk...@gmail.com>.
Actually this is not true - the source emits 30 values since it is started
with 0. If I change 29 to 33 result will be the same.
I can get all values if I play with parallelism. I.e putting parallel 1
before print.
Or if I change 29 to 39 ( I have 4 cors)
I can guess that there is smth wrong with threads. BTW in this case how
threads are created and how data flows between?
On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" <al...@apache.org> wrote:

> Hi,
> this is related to your other question about count windows. The source
> emits 29 values so we only have two count-windows with 10 elements each.
> The last window is never triggered.
>
> Cheers,
> Aljoscha
>
> On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <kk...@gmail.com> wrote:
>
>> I think it has smth to do with parallelism and I probably do not have
>> clear understanding how parallelism works in flink but in this example:
>>
>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>>
>>       @Override
>>       public void run(SourceContext<Long> ctx) throws Exception {
>>         LongStream.range(0, 29).forEach(ctx::collect);
>>       }
>>
>>       @Override
>>       public void cancel() {
>>
>>       }
>>     });
>>
>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>>       @Override
>>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>>         for (Long value : values) {
>>           if (value % 3 == 0) {
>>             out.collect(value);
>>           }
>>         }
>>       }
>>     }).print();
>>
>>     env.execute("yoyoyo");
>>
>> Why my output is like this:
>>
>> 4> 9
>> 1> 0
>> 1> 12
>> 3> 6
>> 3> 18
>> 2> 3
>> 2> 15
>>
>> ? I.e. where id s value of 24 for example? I expect to see it. What am I
>> doing wrong?
>>
>

Re: Values are missing, probably due parallelism?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
this is related to your other question about count windows. The source
emits 29 values so we only have two count-windows with 10 elements each.
The last window is never triggered.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <kk...@gmail.com> wrote:

> I think it has smth to do with parallelism and I probably do not have
> clear understanding how parallelism works in flink but in this example:
>
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>
>       @Override
>       public void run(SourceContext<Long> ctx) throws Exception {
>         LongStream.range(0, 29).forEach(ctx::collect);
>       }
>
>       @Override
>       public void cancel() {
>
>       }
>     });
>
>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, GlobalWindow>() {
>       @Override
>       public void apply(GlobalWindow window, Iterable<Long> values, Collector<Long> out) throws Exception {
>         for (Long value : values) {
>           if (value % 3 == 0) {
>             out.collect(value);
>           }
>         }
>       }
>     }).print();
>
>     env.execute("yoyoyo");
>
> Why my output is like this:
>
> 4> 9
> 1> 0
> 1> 12
> 3> 6
> 3> 18
> 2> 3
> 2> 15
>
> ? I.e. where id s value of 24 for example? I expect to see it. What am I
> doing wrong?
>