You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Garrett Barton <ga...@gmail.com> on 2017/10/02 13:46:30 UTC
At end of complex parallel flow, how to force end step with parallel=1?
I have a complex alg implemented using the DataSet api and by default it
runs with parallel 90 for good performance. At the end I want to perform a
clustering of the resulting data and to do that correctly I need to pass
all the data through a single thread/process.
I read in the docs that as long as I did a global reduce using
DataSet.reduceGroup(new GroupReduceFunction....) that it would force it to
a single thread. Yet when I run the flow and bring it up in the ui, I see
parallel 90 all the way through the dag including this one.
Is there a config or feature to force the flow back to a single thread? Or
should I just split this into two completely separate jobs? I'd rather not
split as I would like to use flinks ability to iterate on this alg and
cluster combo.
Thank you
Re: At end of complex parallel flow, how to force end step with parallel=1?
Posted by Fabian Hueske <fh...@gmail.com>.
Hi Garrett,
thanks for reporting back!
Glad you could resolve the issue :-)
Best, Fabian
2017-10-05 23:21 GMT+02:00 Garrett Barton <ga...@gmail.com>:
> Fabian,
>
> Turns out I was wrong. My flow was in fact running in two separate jobs
> due to me trying to use a local variable calculated by
> ...distinct().count() in a downstream flow. The second flow indeed set
> parallelism correctly! Thank you for the help. :)
>
> On Wed, Oct 4, 2017 at 8:01 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Garrett,
>>
>> that's strange. DataSet.reduceGroup() will create a non-parallel
>> GroupReduce operator.
>> So even without setting the parallelism manually to 1, the operator
>> should not run in parallel.
>> What might happen though is that a combiner is applied to locally reduce
>> the data before it is shipped to the single instance.
>> Does your GroupReduceFunction implement a Combiner interface?
>>
>> I'm not aware of visualization problems of the web UI.
>> Can you maybe share a screenshot of the UI showing the issue?
>>
>> Thanks, Fabian
>>
>> 2017-10-03 21:57 GMT+02:00 Garrett Barton <ga...@gmail.com>:
>>
>>> Gábor
>>> ,
>>> Thank you for the reply, I gave that a go and the flow still showed
>>> parallel 90 for each step. Is the ui not 100% accurate perhaps?
>>>
>>> To get around it for now I implemented a partitioner that threw all the
>>> data to the same partition, hack but works!
>>>
>>> On Tue, Oct 3, 2017 at 4:12 AM, Gábor Gévay <gg...@gmail.com> wrote:
>>>
>>>> Hi Garrett,
>>>>
>>>> You can call .setParallelism(1) on just this operator:
>>>>
>>>> ds.reduceGroup(new GroupReduceFunction...).setParallelism(1)
>>>>
>>>> Best,
>>>> Gabor
>>>>
>>>>
>>>>
>>>> On Mon, Oct 2, 2017 at 3:46 PM, Garrett Barton <
>>>> garrett.barton@gmail.com> wrote:
>>>> > I have a complex alg implemented using the DataSet api and by default
>>>> it
>>>> > runs with parallel 90 for good performance. At the end I want to
>>>> perform a
>>>> > clustering of the resulting data and to do that correctly I need to
>>>> pass all
>>>> > the data through a single thread/process.
>>>> >
>>>> > I read in the docs that as long as I did a global reduce using
>>>> > DataSet.reduceGroup(new GroupReduceFunction....) that it would force
>>>> it to a
>>>> > single thread. Yet when I run the flow and bring it up in the ui, I
>>>> see
>>>> > parallel 90 all the way through the dag including this one.
>>>> >
>>>> > Is there a config or feature to force the flow back to a single
>>>> thread? Or
>>>> > should I just split this into two completely separate jobs? I'd
>>>> rather not
>>>> > split as I would like to use flinks ability to iterate on this alg and
>>>> > cluster combo.
>>>> >
>>>> > Thank you
>>>>
>>>
>>>
>>
>
Re: At end of complex parallel flow, how to force end step with parallel=1?
Posted by Garrett Barton <ga...@gmail.com>.
Fabian,
Turns out I was wrong. My flow was in fact running in two separate jobs
due to me trying to use a local variable calculated by
...distinct().count() in a downstream flow. The second flow indeed set
parallelism correctly! Thank you for the help. :)
On Wed, Oct 4, 2017 at 8:01 AM, Fabian Hueske <fh...@gmail.com> wrote:
> Hi Garrett,
>
> that's strange. DataSet.reduceGroup() will create a non-parallel
> GroupReduce operator.
> So even without setting the parallelism manually to 1, the operator should
> not run in parallel.
> What might happen though is that a combiner is applied to locally reduce
> the data before it is shipped to the single instance.
> Does your GroupReduceFunction implement a Combiner interface?
>
> I'm not aware of visualization problems of the web UI.
> Can you maybe share a screenshot of the UI showing the issue?
>
> Thanks, Fabian
>
> 2017-10-03 21:57 GMT+02:00 Garrett Barton <ga...@gmail.com>:
>
>> Gábor
>> ,
>> Thank you for the reply, I gave that a go and the flow still showed
>> parallel 90 for each step. Is the ui not 100% accurate perhaps?
>>
>> To get around it for now I implemented a partitioner that threw all the
>> data to the same partition, hack but works!
>>
>> On Tue, Oct 3, 2017 at 4:12 AM, Gábor Gévay <gg...@gmail.com> wrote:
>>
>>> Hi Garrett,
>>>
>>> You can call .setParallelism(1) on just this operator:
>>>
>>> ds.reduceGroup(new GroupReduceFunction...).setParallelism(1)
>>>
>>> Best,
>>> Gabor
>>>
>>>
>>>
>>> On Mon, Oct 2, 2017 at 3:46 PM, Garrett Barton <ga...@gmail.com>
>>> wrote:
>>> > I have a complex alg implemented using the DataSet api and by default
>>> it
>>> > runs with parallel 90 for good performance. At the end I want to
>>> perform a
>>> > clustering of the resulting data and to do that correctly I need to
>>> pass all
>>> > the data through a single thread/process.
>>> >
>>> > I read in the docs that as long as I did a global reduce using
>>> > DataSet.reduceGroup(new GroupReduceFunction....) that it would force
>>> it to a
>>> > single thread. Yet when I run the flow and bring it up in the ui, I
>>> see
>>> > parallel 90 all the way through the dag including this one.
>>> >
>>> > Is there a config or feature to force the flow back to a single
>>> thread? Or
>>> > should I just split this into two completely separate jobs? I'd
>>> rather not
>>> > split as I would like to use flinks ability to iterate on this alg and
>>> > cluster combo.
>>> >
>>> > Thank you
>>>
>>
>>
>
Re: At end of complex parallel flow, how to force end step with parallel=1?
Posted by Fabian Hueske <fh...@gmail.com>.
Hi Garrett,
that's strange. DataSet.reduceGroup() will create a non-parallel
GroupReduce operator.
So even without setting the parallelism manually to 1, the operator should
not run in parallel.
What might happen though is that a combiner is applied to locally reduce
the data before it is shipped to the single instance.
Does your GroupReduceFunction implement a Combiner interface?
I'm not aware of visualization problems of the web UI.
Can you maybe share a screenshot of the UI showing the issue?
Thanks, Fabian
2017-10-03 21:57 GMT+02:00 Garrett Barton <ga...@gmail.com>:
> Gábor
> ,
> Thank you for the reply, I gave that a go and the flow still showed
> parallel 90 for each step. Is the ui not 100% accurate perhaps?
>
> To get around it for now I implemented a partitioner that threw all the
> data to the same partition, hack but works!
>
> On Tue, Oct 3, 2017 at 4:12 AM, Gábor Gévay <gg...@gmail.com> wrote:
>
>> Hi Garrett,
>>
>> You can call .setParallelism(1) on just this operator:
>>
>> ds.reduceGroup(new GroupReduceFunction...).setParallelism(1)
>>
>> Best,
>> Gabor
>>
>>
>>
>> On Mon, Oct 2, 2017 at 3:46 PM, Garrett Barton <ga...@gmail.com>
>> wrote:
>> > I have a complex alg implemented using the DataSet api and by default it
>> > runs with parallel 90 for good performance. At the end I want to
>> perform a
>> > clustering of the resulting data and to do that correctly I need to
>> pass all
>> > the data through a single thread/process.
>> >
>> > I read in the docs that as long as I did a global reduce using
>> > DataSet.reduceGroup(new GroupReduceFunction....) that it would force it
>> to a
>> > single thread. Yet when I run the flow and bring it up in the ui, I see
>> > parallel 90 all the way through the dag including this one.
>> >
>> > Is there a config or feature to force the flow back to a single
>> thread? Or
>> > should I just split this into two completely separate jobs? I'd rather
>> not
>> > split as I would like to use flinks ability to iterate on this alg and
>> > cluster combo.
>> >
>> > Thank you
>>
>
>
Re: At end of complex parallel flow, how to force end step with parallel=1?
Posted by Garrett Barton <ga...@gmail.com>.
Gábor
,
Thank you for the reply, I gave that a go and the flow still showed
parallel 90 for each step. Is the ui not 100% accurate perhaps?
To get around it for now I implemented a partitioner that threw all the
data to the same partition, hack but works!
On Tue, Oct 3, 2017 at 4:12 AM, Gábor Gévay <gg...@gmail.com> wrote:
> Hi Garrett,
>
> You can call .setParallelism(1) on just this operator:
>
> ds.reduceGroup(new GroupReduceFunction...).setParallelism(1)
>
> Best,
> Gabor
>
>
>
> On Mon, Oct 2, 2017 at 3:46 PM, Garrett Barton <ga...@gmail.com>
> wrote:
> > I have a complex alg implemented using the DataSet api and by default it
> > runs with parallel 90 for good performance. At the end I want to perform
> a
> > clustering of the resulting data and to do that correctly I need to pass
> all
> > the data through a single thread/process.
> >
> > I read in the docs that as long as I did a global reduce using
> > DataSet.reduceGroup(new GroupReduceFunction....) that it would force it
> to a
> > single thread. Yet when I run the flow and bring it up in the ui, I see
> > parallel 90 all the way through the dag including this one.
> >
> > Is there a config or feature to force the flow back to a single thread?
> Or
> > should I just split this into two completely separate jobs? I'd rather
> not
> > split as I would like to use flinks ability to iterate on this alg and
> > cluster combo.
> >
> > Thank you
>
Re: At end of complex parallel flow, how to force end step with parallel=1?
Posted by Gábor Gévay <gg...@gmail.com>.
Hi Garrett,
You can call .setParallelism(1) on just this operator:
ds.reduceGroup(new GroupReduceFunction...).setParallelism(1)
Best,
Gabor
On Mon, Oct 2, 2017 at 3:46 PM, Garrett Barton <ga...@gmail.com> wrote:
> I have a complex alg implemented using the DataSet api and by default it
> runs with parallel 90 for good performance. At the end I want to perform a
> clustering of the resulting data and to do that correctly I need to pass all
> the data through a single thread/process.
>
> I read in the docs that as long as I did a global reduce using
> DataSet.reduceGroup(new GroupReduceFunction....) that it would force it to a
> single thread. Yet when I run the flow and bring it up in the ui, I see
> parallel 90 all the way through the dag including this one.
>
> Is there a config or feature to force the flow back to a single thread? Or
> should I just split this into two completely separate jobs? I'd rather not
> split as I would like to use flinks ability to iterate on this alg and
> cluster combo.
>
> Thank you