You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Giannis Evagorou <ge...@msn.com> on 2018/12/02 12:35:41 UTC

withPartitioner() vs calling partitionCustom() beforehand

Hi all,

I have a question regarding partitioning.

Does calling the withPartitioner() method on a coGroup operation has the same effect as performing partitionCustom on both datasets beforehand?
i.e.

Is

  1.  a.coGroup(b).where(…).equalTo(…).withPartitioner(…).with(…)

equivalent to:


  1.  DataSet a = aa.partitionCustom(…)
  2.  DataSet b = bb.partitionCustom(…)
  3.  a.coGroup(b).where(…).equalTo(…).with(…)


Do both snippets perform the same low-level physical partitioning?

Thank you,
Giannis


Re: withPartitioner() vs calling partitionCustom() beforehand

Posted by Giannis Evagorou <ge...@msn.com>.
Hi Till,

 Thank you for your answer.

Giannis
________________________________
From: Till Rohrmann <tr...@apache.org>
Sent: Monday, December 3, 2018 1:46 PM
To: dev@flink.apache.org
Subject: Re: withPartitioner() vs calling partitionCustom() beforehand

Hi Giannis,

logically the resulting plans should be identical, meaning that they both
will use the custom partitioner to create the partitions and then co group
both inputs.

Physically, the latter plan adds an additional partition operator before
the coGroup operator. You can see this is you call env.getExecutionPlan()
and then use Flink's plan visualizer [1]. The partition operator adds
another task which instantiates another thread. Consequently,
coGroup(b).where(...).equalTo(...).withPartitioner(...) should be slightly
more efficient.

[1] https://flink.apache.org/visualizer/

Cheers,
Till

On Sun, Dec 2, 2018 at 1:35 PM Giannis Evagorou <ge...@msn.com> wrote:

> Hi all,
>
> I have a question regarding partitioning.
>
> Does calling the withPartitioner() method on a coGroup operation has the
> same effect as performing partitionCustom on both datasets beforehand?
> i.e.
>
> Is
>
>   1.  a.coGroup(b).where(…).equalTo(…).withPartitioner(…).with(…)
>
> equivalent to:
>
>
>   1.  DataSet a = aa.partitionCustom(…)
>   2.  DataSet b = bb.partitionCustom(…)
>   3.  a.coGroup(b).where(…).equalTo(…).with(…)
>
>
> Do both snippets perform the same low-level physical partitioning?
>
> Thank you,
> Giannis
>
>

Re: withPartitioner() vs calling partitionCustom() beforehand

Posted by Till Rohrmann <tr...@apache.org>.
Hi Giannis,

logically the resulting plans should be identical, meaning that they both
will use the custom partitioner to create the partitions and then co group
both inputs.

Physically, the latter plan adds an additional partition operator before
the coGroup operator. You can see this is you call env.getExecutionPlan()
and then use Flink's plan visualizer [1]. The partition operator adds
another task which instantiates another thread. Consequently,
coGroup(b).where(...).equalTo(...).withPartitioner(...) should be slightly
more efficient.

[1] https://flink.apache.org/visualizer/

Cheers,
Till

On Sun, Dec 2, 2018 at 1:35 PM Giannis Evagorou <ge...@msn.com> wrote:

> Hi all,
>
> I have a question regarding partitioning.
>
> Does calling the withPartitioner() method on a coGroup operation has the
> same effect as performing partitionCustom on both datasets beforehand?
> i.e.
>
> Is
>
>   1.  a.coGroup(b).where(…).equalTo(…).withPartitioner(…).with(…)
>
> equivalent to:
>
>
>   1.  DataSet a = aa.partitionCustom(…)
>   2.  DataSet b = bb.partitionCustom(…)
>   3.  a.coGroup(b).where(…).equalTo(…).with(…)
>
>
> Do both snippets perform the same low-level physical partitioning?
>
> Thank you,
> Giannis
>
>