You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Marius Melzer <ma...@rasumi.net> on 2016/06/26 23:56:59 UTC

forward()

Hi,

I was reading a bit of code about partitioning and I have two questions
about the forward() method of DataStream:

1) I was before under the impression that forwarding means sending
records between operators with same parallelism always from partition 1
to 1, 2 to 2 etc. But this doesn't seem the case because
ForwardPartitioner#selectChannels returns new int[] {0} - which is
interestingly the exact same code like GlobalPartitioner.
2) Why is there a forward()-statement anyways? Isn't this the default
that messages are forwarded between same partitions or am I getting
something completely wrong here? If so, how does it work and what would
be a good use case for the forward() statement?

Thanks,
Marius

Re: forward()

Posted by Till Rohrmann <tr...@apache.org>.
Maybe we could document this fact in the code base. I think this is not a
very obvious behaviour.

Cheers,
Till

On Mon, Jun 27, 2016 at 11:24 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> you are right, this seems a bit strange. The reason why this works is that
> selectChannels of ForwardPartitioner is never called.
>
> In StreamingJobGraphGenerator.connect() operators are connected by
> "physical" edges. If the ForwardPartitioner is set on a stream this will
> choose the POINTWISE connection pattern, which does local forwarding from
> partition 1 to 1, 2 to 2, and so on. Only if another partitioner is set
> will the ALL_TO_ALL pattern be used. In that case selectChannels() of the
> partitioner is actually called for decision where to send to.
>
> Cheers,
> Aljoscha
>
> On Mon, 27 Jun 2016 at 01:57 Marius Melzer <ma...@rasumi.net> wrote:
>
> > Hi,
> >
> > I was reading a bit of code about partitioning and I have two questions
> > about the forward() method of DataStream:
> >
> > 1) I was before under the impression that forwarding means sending
> > records between operators with same parallelism always from partition 1
> > to 1, 2 to 2 etc. But this doesn't seem the case because
> > ForwardPartitioner#selectChannels returns new int[] {0} - which is
> > interestingly the exact same code like GlobalPartitioner.
> > 2) Why is there a forward()-statement anyways? Isn't this the default
> > that messages are forwarded between same partitions or am I getting
> > something completely wrong here? If so, how does it work and what would
> > be a good use case for the forward() statement?
> >
> > Thanks,
> > Marius
> >
>

Re: forward()

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
you are right, this seems a bit strange. The reason why this works is that
selectChannels of ForwardPartitioner is never called.

In StreamingJobGraphGenerator.connect() operators are connected by
"physical" edges. If the ForwardPartitioner is set on a stream this will
choose the POINTWISE connection pattern, which does local forwarding from
partition 1 to 1, 2 to 2, and so on. Only if another partitioner is set
will the ALL_TO_ALL pattern be used. In that case selectChannels() of the
partitioner is actually called for decision where to send to.

Cheers,
Aljoscha

On Mon, 27 Jun 2016 at 01:57 Marius Melzer <ma...@rasumi.net> wrote:

> Hi,
>
> I was reading a bit of code about partitioning and I have two questions
> about the forward() method of DataStream:
>
> 1) I was before under the impression that forwarding means sending
> records between operators with same parallelism always from partition 1
> to 1, 2 to 2 etc. But this doesn't seem the case because
> ForwardPartitioner#selectChannels returns new int[] {0} - which is
> interestingly the exact same code like GlobalPartitioner.
> 2) Why is there a forward()-statement anyways? Isn't this the default
> that messages are forwarded between same partitions or am I getting
> something completely wrong here? If so, how does it work and what would
> be a good use case for the forward() statement?
>
> Thanks,
> Marius
>