You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jerry Peng <je...@gmail.com> on 2017/08/18 21:08:27 UTC

Question about parallelism

Hello all,

I have a question about parallelism and partitioning in the
DataStreams API.  In Flink, a user can the parallelism of a data
source as well as operators.  So when I set the parallelism of a data
source e.g.

DataStream<String> text =
env.readTextFile(params.get("input")).setParallelism(5)

does this mean that the resulting "text" DataStream in going to be
partitioned into 5 partitions or does it mean that there are going to
be 5 parallel tasks that are going to run for this stage?

If the next operator is:

DataStream<Tuple2<String, Integer>> counts = text.flatMap(new
Tokenizer()).setParallelism(10)

and the parallelism is set to 10.  Are there 10 parallel tasks
consuming from the 5 partitions? and how is the resulting "counts"
DataStream partitioned? into 10 partitions?

Thanks in advance!

Best,

Jerry

Re: Question about parallelism

Posted by Till Rohrmann <tr...@apache.org>.
Hi Jerry,

you can set the global parallelism via the
ExecutionEnvironment#setParallelism. If you call setParallelism on an
operator, then it only changes the parallelism of this operator. The
parallelism of an operator means how many parallel instances of this
operator will be executed. Thus, it also means into how many partitions
(potentially infinite) your data stream will be partitioned.

If the parallelism changes between two operators, then there is a
re-partitioning of the data in a round-robin fashion across all parallel
subtasks of the succeeding operator.

Cheers,
Till
​

On Fri, Aug 18, 2017 at 11:12 PM, Jerry Peng <je...@gmail.com>
wrote:

> I guess my previous question is also asking if the parallelism is set
> for the operator or "data stream".  Is there implied repartitioning
> when the parallelism changes?
>
> On Fri, Aug 18, 2017 at 2:08 PM, Jerry Peng <je...@gmail.com>
> wrote:
> > Hello all,
> >
> > I have a question about parallelism and partitioning in the
> > DataStreams API.  In Flink, a user can the parallelism of a data
> > source as well as operators.  So when I set the parallelism of a data
> > source e.g.
> >
> > DataStream<String> text =
> > env.readTextFile(params.get("input")).setParallelism(5)
> >
> > does this mean that the resulting "text" DataStream in going to be
> > partitioned into 5 partitions or does it mean that there are going to
> > be 5 parallel tasks that are going to run for this stage?
> >
> > If the next operator is:
> >
> > DataStream<Tuple2<String, Integer>> counts = text.flatMap(new
> > Tokenizer()).setParallelism(10)
> >
> > and the parallelism is set to 10.  Are there 10 parallel tasks
> > consuming from the 5 partitions? and how is the resulting "counts"
> > DataStream partitioned? into 10 partitions?
> >
> > Thanks in advance!
> >
> > Best,
> >
> > Jerry
>

Re: Question about parallelism

Posted by Jerry Peng <je...@gmail.com>.
I guess my previous question is also asking if the parallelism is set
for the operator or "data stream".  Is there implied repartitioning
when the parallelism changes?

On Fri, Aug 18, 2017 at 2:08 PM, Jerry Peng <je...@gmail.com> wrote:
> Hello all,
>
> I have a question about parallelism and partitioning in the
> DataStreams API.  In Flink, a user can the parallelism of a data
> source as well as operators.  So when I set the parallelism of a data
> source e.g.
>
> DataStream<String> text =
> env.readTextFile(params.get("input")).setParallelism(5)
>
> does this mean that the resulting "text" DataStream in going to be
> partitioned into 5 partitions or does it mean that there are going to
> be 5 parallel tasks that are going to run for this stage?
>
> If the next operator is:
>
> DataStream<Tuple2<String, Integer>> counts = text.flatMap(new
> Tokenizer()).setParallelism(10)
>
> and the parallelism is set to 10.  Are there 10 parallel tasks
> consuming from the 5 partitions? and how is the resulting "counts"
> DataStream partitioned? into 10 partitions?
>
> Thanks in advance!
>
> Best,
>
> Jerry