You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Aljoscha Krettek <al...@apache.org> on 2015/05/25 10:26:13 UTC

[DISCUSS] Behaviour of startNewChain() in Streaming

Hi,
I think people will be confused by the behaviour of startNewChain() in
the streaming API. I myself had wrong assumptions about how it behaves
when I was writing a test Job and the only other Job where someone not
coming from Streaming wrote a Streaming Test is also making wrong
assumptions. (StreamCheckpointingITCase, I'm not saying that's a
stupid mistake, I made the same mistake myself.)

So what chains of operators should this snipped produce:

input
  .map(new Map1())
  .map(new Map2())
  .startNewChain()
  .map(new Map3())
  .print()

I would guess that your assumption about where the split in the chains
happens here is wrong. :D

Cheers,
Aljoscha

Re: [DISCUSS] Behaviour of startNewChain() in Streaming

Posted by Maximilian Michels <mx...@apache.org>.
I second Aljoscha's and Matthias' opinion on the behavior of
`startNewChain()`. In the case of `setParallelism(..)`, we set the
parallelism of the operator but in case of `startNewChain()`, we explicitly
start a new chain; for the user, this is not connected to the previous
operation even though the programmer sees it being called on the operator
itself. However, if the method was instead named `breakChain()` I'd be ok
with it.

On Mon, May 25, 2015 at 10:48 PM, Matthias J. Sax <
mjsax@informatik.hu-berlin.de> wrote:

> I agree with Aljoschas argumentation. It would be more intuitive if
> "startNewChain()" splits the chain where is it put.
>
>
> On 05/25/2015 10:48 AM, Aljoscha Krettek wrote:
> > Yes, this is another example where it might be problematic but I think
> > there are different ideas here: Methods such as setParallelism(),
> > name() and so on can be seen as modifying the operation that was
> > previously constructed. Method such as groupBy(), startNewChain() can
> > be seen as acting at that point in the topology: groupBy changes the
> > partitioning/grouping of the operations coming afterwards,
> > startNewChain() starts a new chain "after" the call.
> >
> > I know that this is also just my opinion and other people could see it
> > differently. This is a problem of our API, where the construction of
> > an operation is not encapsulated but scattered across many different
> > method calls.
> >
> > On Mon, May 25, 2015 at 10:37 AM, Gyula Fóra <gy...@apache.org> wrote:
> >> I see your point but this is a general problem with any property that we
> >> set on the operators itself. Same goes for instance for parallelism :
> >>
> >> input
> >>   .map(new Map1())
> >>   .setParallelism(2)
> >>   .map(new Map2))
> >>   .print()
> >>
> >> Do we change the parallelism after map 1 so it applies to map 2?
> >>
> >> Gyula
> >>
> >>
> >> On Mon, May 25, 2015 at 10:26 AM, Aljoscha Krettek <aljoscha@apache.org
> >
> >> wrote:
> >>
> >>> Hi,
> >>> I think people will be confused by the behaviour of startNewChain() in
> >>> the streaming API. I myself had wrong assumptions about how it behaves
> >>> when I was writing a test Job and the only other Job where someone not
> >>> coming from Streaming wrote a Streaming Test is also making wrong
> >>> assumptions. (StreamCheckpointingITCase, I'm not saying that's a
> >>> stupid mistake, I made the same mistake myself.)
> >>>
> >>> So what chains of operators should this snipped produce:
> >>>
> >>> input
> >>>   .map(new Map1())
> >>>   .map(new Map2())
> >>>   .startNewChain()
> >>>   .map(new Map3())
> >>>   .print()
> >>>
> >>> I would guess that your assumption about where the split in the chains
> >>> happens here is wrong. :D
> >>>
> >>> Cheers,
> >>> Aljoscha
> >>>
> >
>
>

Re: [DISCUSS] Behaviour of startNewChain() in Streaming

Posted by "Matthias J. Sax" <mj...@informatik.hu-berlin.de>.
I agree with Aljoschas argumentation. It would be more intuitive if
"startNewChain()" splits the chain where is it put.


On 05/25/2015 10:48 AM, Aljoscha Krettek wrote:
> Yes, this is another example where it might be problematic but I think
> there are different ideas here: Methods such as setParallelism(),
> name() and so on can be seen as modifying the operation that was
> previously constructed. Method such as groupBy(), startNewChain() can
> be seen as acting at that point in the topology: groupBy changes the
> partitioning/grouping of the operations coming afterwards,
> startNewChain() starts a new chain "after" the call.
> 
> I know that this is also just my opinion and other people could see it
> differently. This is a problem of our API, where the construction of
> an operation is not encapsulated but scattered across many different
> method calls.
> 
> On Mon, May 25, 2015 at 10:37 AM, Gyula Fóra <gy...@apache.org> wrote:
>> I see your point but this is a general problem with any property that we
>> set on the operators itself. Same goes for instance for parallelism :
>>
>> input
>>   .map(new Map1())
>>   .setParallelism(2)
>>   .map(new Map2))
>>   .print()
>>
>> Do we change the parallelism after map 1 so it applies to map 2?
>>
>> Gyula
>>
>>
>> On Mon, May 25, 2015 at 10:26 AM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>> I think people will be confused by the behaviour of startNewChain() in
>>> the streaming API. I myself had wrong assumptions about how it behaves
>>> when I was writing a test Job and the only other Job where someone not
>>> coming from Streaming wrote a Streaming Test is also making wrong
>>> assumptions. (StreamCheckpointingITCase, I'm not saying that's a
>>> stupid mistake, I made the same mistake myself.)
>>>
>>> So what chains of operators should this snipped produce:
>>>
>>> input
>>>   .map(new Map1())
>>>   .map(new Map2())
>>>   .startNewChain()
>>>   .map(new Map3())
>>>   .print()
>>>
>>> I would guess that your assumption about where the split in the chains
>>> happens here is wrong. :D
>>>
>>> Cheers,
>>> Aljoscha
>>>
> 


Re: [DISCUSS] Behaviour of startNewChain() in Streaming

Posted by Aljoscha Krettek <al...@apache.org>.
Yes, this is another example where it might be problematic but I think
there are different ideas here: Methods such as setParallelism(),
name() and so on can be seen as modifying the operation that was
previously constructed. Method such as groupBy(), startNewChain() can
be seen as acting at that point in the topology: groupBy changes the
partitioning/grouping of the operations coming afterwards,
startNewChain() starts a new chain "after" the call.

I know that this is also just my opinion and other people could see it
differently. This is a problem of our API, where the construction of
an operation is not encapsulated but scattered across many different
method calls.

On Mon, May 25, 2015 at 10:37 AM, Gyula Fóra <gy...@apache.org> wrote:
> I see your point but this is a general problem with any property that we
> set on the operators itself. Same goes for instance for parallelism :
>
> input
>   .map(new Map1())
>   .setParallelism(2)
>   .map(new Map2))
>   .print()
>
> Do we change the parallelism after map 1 so it applies to map 2?
>
> Gyula
>
>
> On Mon, May 25, 2015 at 10:26 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> I think people will be confused by the behaviour of startNewChain() in
>> the streaming API. I myself had wrong assumptions about how it behaves
>> when I was writing a test Job and the only other Job where someone not
>> coming from Streaming wrote a Streaming Test is also making wrong
>> assumptions. (StreamCheckpointingITCase, I'm not saying that's a
>> stupid mistake, I made the same mistake myself.)
>>
>> So what chains of operators should this snipped produce:
>>
>> input
>>   .map(new Map1())
>>   .map(new Map2())
>>   .startNewChain()
>>   .map(new Map3())
>>   .print()
>>
>> I would guess that your assumption about where the split in the chains
>> happens here is wrong. :D
>>
>> Cheers,
>> Aljoscha
>>

Re: [DISCUSS] Behaviour of startNewChain() in Streaming

Posted by Gyula Fóra <gy...@apache.org>.
I see your point but this is a general problem with any property that we
set on the operators itself. Same goes for instance for parallelism :

input
  .map(new Map1())
  .setParallelism(2)
  .map(new Map2))
  .print()

Do we change the parallelism after map 1 so it applies to map 2?

Gyula


On Mon, May 25, 2015 at 10:26 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> I think people will be confused by the behaviour of startNewChain() in
> the streaming API. I myself had wrong assumptions about how it behaves
> when I was writing a test Job and the only other Job where someone not
> coming from Streaming wrote a Streaming Test is also making wrong
> assumptions. (StreamCheckpointingITCase, I'm not saying that's a
> stupid mistake, I made the same mistake myself.)
>
> So what chains of operators should this snipped produce:
>
> input
>   .map(new Map1())
>   .map(new Map2())
>   .startNewChain()
>   .map(new Map3())
>   .print()
>
> I would guess that your assumption about where the split in the chains
> happens here is wrong. :D
>
> Cheers,
> Aljoscha
>