You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by AndreaKinn <ki...@hotmail.it> on 2017/10/14 18:56:39 UTC

Doubts about parallelism

Hi,
I read the doc about parallelism, parallel execution and job scheduling but
however I have some doubts about parallelism.

1. 
In my first try I unset parallelism in my code and commented
parallelism.default key in link-conf file. In this case I supposed the
parallelism was set by Flink automatically on operator basis. Is this
consideration correct?

2.
In a second try I unset parallelism in my code but I set
parallelism.default: 2 in flink-conf file.
In my code I have some source, some sink and two custom function from an
external library supported by Flink. These one don’t have setParallelism()
method so I can’t set a specific parallelism for them.
Anyway when I tried to execute it I obtain the following error: 

/java.lang.UnsupportedOperationException: Forward partitioning does not
allow change of parallelism. Upstream operation: Learn-11 parallelism: 1,
downstream operation: Select-13 parallelism: 3 You must use another
partitioning strategy, such as broadcast, rebalance, shuffle or global./

This lead me to the second question. Am I constrained to set
parallelism.default: 1 to respect parallelism of “learn” method? In this way
I need to set parallelism to each operator in Flink (for example 2) and
leave “select” parallelism to the default value (1) since I can’t set a
specific parallelism on it (I can’t set 3 as suggested in the error).

Moreover, I searched a lot on relations between partitioning and parallelism
on doc but everything I read seems a bit unclear for me. Can you explain it
better? 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Doubts about parallelism

Posted by Tony Wei <to...@gmail.com>.
Hi Andrea,

For your first question, I think you are right, but the basis is set by the
default value for `parallelism.default` in flink-conf.yaml. [1]

For your second question, I guess you use `forward` function between
"learn" and "select" methods. Am I right?
That exception is an expected behavior, because `forward` function will
forward elements to the local subtask of the next operation, but it needs
the upstream and the downstream have the same parallelism. [2]
Therefore, the exception advised you to change your partitioning strategy,
such as broadcast, rebalance, shuffle, or global, so that the "learn" and
"select" can be applied different parallelism setting.

Hope this will help you.

Best Regards,
Tony Wei

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#common-options
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#forward--

2017-10-15 2:56 GMT+08:00 AndreaKinn <ki...@hotmail.it>:

> Hi,
> I read the doc about parallelism, parallel execution and job scheduling but
> however I have some doubts about parallelism.
>
> 1.
> In my first try I unset parallelism in my code and commented
> parallelism.default key in link-conf file. In this case I supposed the
> parallelism was set by Flink automatically on operator basis. Is this
> consideration correct?
>
> 2.
> In a second try I unset parallelism in my code but I set
> parallelism.default: 2 in flink-conf file.
> In my code I have some source, some sink and two custom function from an
> external library supported by Flink. These one don’t have setParallelism()
> method so I can’t set a specific parallelism for them.
> Anyway when I tried to execute it I obtain the following error:
>
> /java.lang.UnsupportedOperationException: Forward partitioning does not
> allow change of parallelism. Upstream operation: Learn-11 parallelism: 1,
> downstream operation: Select-13 parallelism: 3 You must use another
> partitioning strategy, such as broadcast, rebalance, shuffle or global./
>
> This lead me to the second question. Am I constrained to set
> parallelism.default: 1 to respect parallelism of “learn” method? In this
> way
> I need to set parallelism to each operator in Flink (for example 2) and
> leave “select” parallelism to the default value (1) since I can’t set a
> specific parallelism on it (I can’t set 3 as suggested in the error).
>
> Moreover, I searched a lot on relations between partitioning and
> parallelism
> on doc but everything I read seems a bit unclear for me. Can you explain it
> better?
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>