You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Fridtjof Sander <fs...@mailbox.tu-berlin.de> on 2016/02/01 11:47:41 UTC

DataExchangeMode.BATCH in iterations

Dear Flink-Devs,

I recently ran into a problem where range-partitioning within iterations 
would be useful.

In the PR for range-partitioning it is said, this doesn't work because 
of some batched data-exchange mode.
https://github.com/apache/flink/pull/1255

I would like to understand the issue with that, but could not find 
articles/blog posts/etc to read about that.

Do you have some pointers for me? Code will also work if the concept 
gets clear from it.

Thanks for your time!

Best, Fridtjof

Re: DataExchangeMode.BATCH in iterations

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Flink has a non-batch exchange way to break pipelines, which is by now
quite custom for iterations. It is used there for constructs that fork and
re-join the flow.

The proper batch-exchange is better, because the scheduler can exploit
that, but is is not yet usable in iterations.

Stephan



On Mon, Feb 1, 2016 at 1:04 PM, Fridtjof Sander <
fsander@mailbox.tu-berlin.de> wrote:

> Hi Fabian,
>
> thanks for your explanation!
>
> Yeah, I figured that if an easy fix exists, you would have done that
> yourself. This is more for me to understand the conceptual problem.
>
> But back to the pipeline-requirement: Doesn't zipWithIndex violate that
> too, then? It's also a mapPartitions, collect + broadcast, plus another
> mapPartitions. This should roughly be the same procedure as building a
> histogram and propagate partition boundaries, right?. Not much going on
> there with pipelining. However, I hadn't problems with zipWithIndex inside
> iterations.
>
> Also, is there a difference between the "materialization" you mentioned
> and the execution of a datasink operator?
>
> Again, if all that is written somewhere, just throw me the link, I don't
> want to waste your time.
>
> Best
> Fridtjof
>
> Am 1. Februar 2016 12:21:21 MEZ, schrieb Fabian Hueske <fhueske@gmail.com
> >:
> >Hi Fridtjof,
> >
> >the range partitioner works by building a histogram for the
> >partitioning
> >key. This requires a pass over the whole intermediate data set which
> >means
> >it needs to be materialized and cannot be processed in a pipelined
> >fashion.
> >However, pipelined data exchange strategies are a requirement for the
> >data
> >flows which are executed for iteration bodies.
> >
> >This is nothing that can be easily fixed at the moment. Touching this
> >part
> >of the runtime code would have major implications.
> >I afraid, but I believe we have to accept this restriction.
> >
> >Best, Fabian
> >
> >
> >2016-02-01 11:47 GMT+01:00 Fridtjof Sander
> ><fs...@mailbox.tu-berlin.de>:
> >
> >> Dear Flink-Devs,
> >>
> >> I recently ran into a problem where range-partitioning within
> >iterations
> >> would be useful.
> >>
> >> In the PR for range-partitioning it is said, this doesn't work
> >because of
> >> some batched data-exchange mode.
> >> https://github.com/apache/flink/pull/1255
> >>
> >> I would like to understand the issue with that, but could not find
> >> articles/blog posts/etc to read about that.
> >>
> >> Do you have some pointers for me? Code will also work if the concept
> >gets
> >> clear from it.
> >>
> >> Thanks for your time!
> >>
> >> Best, Fridtjof
> >>
>

Re: DataExchangeMode.BATCH in iterations

Posted by Fridtjof Sander <fs...@mailbox.tu-berlin.de>.
Hi Fabian, 

thanks for your explanation!

Yeah, I figured that if an easy fix exists, you would have done that yourself. This is more for me to understand the conceptual problem.

But back to the pipeline-requirement: Doesn't zipWithIndex violate that too, then? It's also a mapPartitions, collect + broadcast, plus another mapPartitions. This should roughly be the same procedure as building a histogram and propagate partition boundaries, right?. Not much going on there with pipelining. However, I hadn't problems with zipWithIndex inside iterations.

Also, is there a difference between the "materialization" you mentioned and the execution of a datasink operator?

Again, if all that is written somewhere, just throw me the link, I don't want to waste your time.

Best
Fridtjof

Am 1. Februar 2016 12:21:21 MEZ, schrieb Fabian Hueske <fh...@gmail.com>:
>Hi Fridtjof,
>
>the range partitioner works by building a histogram for the
>partitioning
>key. This requires a pass over the whole intermediate data set which
>means
>it needs to be materialized and cannot be processed in a pipelined
>fashion.
>However, pipelined data exchange strategies are a requirement for the
>data
>flows which are executed for iteration bodies.
>
>This is nothing that can be easily fixed at the moment. Touching this
>part
>of the runtime code would have major implications.
>I afraid, but I believe we have to accept this restriction.
>
>Best, Fabian
>
>
>2016-02-01 11:47 GMT+01:00 Fridtjof Sander
><fs...@mailbox.tu-berlin.de>:
>
>> Dear Flink-Devs,
>>
>> I recently ran into a problem where range-partitioning within
>iterations
>> would be useful.
>>
>> In the PR for range-partitioning it is said, this doesn't work
>because of
>> some batched data-exchange mode.
>> https://github.com/apache/flink/pull/1255
>>
>> I would like to understand the issue with that, but could not find
>> articles/blog posts/etc to read about that.
>>
>> Do you have some pointers for me? Code will also work if the concept
>gets
>> clear from it.
>>
>> Thanks for your time!
>>
>> Best, Fridtjof
>>

Re: DataExchangeMode.BATCH in iterations

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Fridtjof,

the range partitioner works by building a histogram for the partitioning
key. This requires a pass over the whole intermediate data set which means
it needs to be materialized and cannot be processed in a pipelined fashion.
However, pipelined data exchange strategies are a requirement for the data
flows which are executed for iteration bodies.

This is nothing that can be easily fixed at the moment. Touching this part
of the runtime code would have major implications.
I afraid, but I believe we have to accept this restriction.

Best, Fabian


2016-02-01 11:47 GMT+01:00 Fridtjof Sander <fs...@mailbox.tu-berlin.de>:

> Dear Flink-Devs,
>
> I recently ran into a problem where range-partitioning within iterations
> would be useful.
>
> In the PR for range-partitioning it is said, this doesn't work because of
> some batched data-exchange mode.
> https://github.com/apache/flink/pull/1255
>
> I would like to understand the issue with that, but could not find
> articles/blog posts/etc to read about that.
>
> Do you have some pointers for me? Code will also work if the concept gets
> clear from it.
>
> Thanks for your time!
>
> Best, Fridtjof
>