You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Urs Schoenenberger <ur...@tngtech.com> on 2017/09/05 10:30:03 UTC

DataSet: partitionByHash without materializing/spilling the entire partition?

Hi all,

we have a DataSet pipeline which reads CSV input data and then
essentially does a combinable GroupReduce via first(n).

In our first iteration (readCsvFile -> groupBy(0) -> sortGroup(0) ->
first(n)), we got a jobgraph like this:

source --[Forward]--> combine --[Hash Partition on 0, Sort]--> reduce

This works, but we found the combine phase to be inefficient because not
enough combinable elements fit into a sorter. My idea was to
pre-partition the DataSet to increase the chance of combinable elements
(readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) -> first(n)).

To my surprise, I found that this changed the job graph to

source --[Hash Partition on 0]--> partition(noop) --[Forward]--> combine
--[Hash Partition on 0, Sort]--> reduce

while materializing and spilling the entire partitions at the
partition(noop)-Operator!

Is there any way I can partition the data on the way from source to
combine without spilling? That is, can I get a job graph that looks like


source --[Hash Partition on 0]--> combine --[Hash Partition on 0,
Sort]--> reduce

instead?

Thanks,
Urs

-- 
Urs Schönenberger - urs.schoenenberger@tngtech.com

TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Re: DataSet: partitionByHash without materializing/spilling the entire partition?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Billy,

 a program that is defined as

Dataset -> Map > Filter -> Map -> Output

should not spill at all.
There is an unnecessary serialization/deserialization step between the last
map and the sink, but there shouldn't be any spilling to disk.

As I said in my response to Urs, spilling should only happen in a few cases:

- full sort with not sufficient memory
- hash-tables that need to spill (only in join operators)
- range partitioning to compute a histogram of the partitioning keys.
- temp nodes to avoid deadlocks. These can occur in plans that branch and
join later like the following:

              /--- Map ---\
Input --<                   JOIN --- Output
              \--- Map ---/

The first two should not be surprising, but the last one is usually
unexpected.

Can you share a bit more information about your optimization of rewriting

        Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map -> Output

to

        Dataset -> Map -> FilterT -> CoGroup > Map -> Output
        Dataset -> Map -> FilterF -> Map -> Output

I did not completely understand the structure of the first job. Is it
branching and merging again?
Maybe you can share the JSON plan (ExecutionEnvironment.getExecutionPlan())?

Thanks, Fabian

2017-09-06 14:41 GMT+02:00 Fabian Hueske <fh...@gmail.com>:

> btw. not sure if you know that you can visualize the JSON plan returned by
> ExecutionEnvironment.getExecutionPlan() on the website [1].
>
> Best, Fabian
>
> [1] http://flink.apache.org/visualizer/
>
>
> 2017-09-06 14:39 GMT+02:00 Fabian Hueske <fh...@gmail.com>:
>
>> Hi Urs,
>>
>> a hash-partition operator should not spill. In general, DataSet plans aim
>> to be as much pipelined as possible.
>> There are a few cases when spilling happens:
>>
>> - full sort with not sufficient memory
>> - hash-tables that need to spill (only in join operators)
>> - range partitioning to compute a histogram of the partitioning keys.
>> - temp nodes to avoid deadlocks. These can occur in plans that branch and
>> join later like the following:
>>
>>               /--- Map ---\
>> Input --<                   JOIN --- Output
>>               \--- Map ---/
>>
>>
>> A simple plan without branching with as the one you posted
>>    readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) ->
>> first(n)
>> has no reason to spill except for the full sort that is required for the
>> final aggregation.
>>
>> Can you share the execution plan that you get of the plan
>> (ExecutionEnvironment.getExecutionPlan())?
>>
>> Btw, the sortGroup(0) call is superfluous because it would sort a group
>> where all 0-fields are the same on the 0-field.
>> I believe Flink's optimizer automatically removes that so it does not
>> impact the performance.
>> Sorting on another field would indeed make sense, because this would
>> determine order within a group and hence the records which are forwarded by
>> First(n).
>>
>> In order to force a combiner on a partitioned data set, you can do the
>> following:
>>
>> --------
>>
>> public static void main(String[] args) throws Exception {
>>
>>    ExecutionEnvironment env = ExecutionEnvironment.getExecut
>> ionEnvironment();
>>
>>    DataSet<Tuple2<Long, Long>> data = randData(env);
>>
>>    DataSet<Tuple2<Long, Long>> result = data.partitionByHash(0)
>>       .groupBy(0).combineGroup(new First3())
>>          .withForwardedFields("f0")
>>       .groupBy(0).reduceGroup(new First3());
>>
>>    result.print();
>> }
>>
>> public static class First3 implements
>>    GroupCombineFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>,
>>    GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
>>
>>    @Override
>>    public void combine(Iterable<Tuple2<Long, Long>> values,
>> Collector<Tuple2<Long, Long>> out) throws Exception {
>>       reduce(values, out);
>>    }
>>
>>    @Override
>>    public void reduce(Iterable<Tuple2<Long, Long>> values,
>> Collector<Tuple2<Long, Long>> out) throws Exception {
>>       int i = 0;
>>       for (Tuple2<Long, Long> v : values) {
>>          out.collect(v);
>>          i++;
>>          if (i == 3) {
>>             break;
>>          }
>>       }
>>    }
>> }
>>
>> --------
>>
>> The generated plan will
>> - hash partition the input data
>> - partially sort the data in memory on the first field (not going to disk)
>> - invoke the combiner for each in-memory sorted group
>> - locally forward the data (because of the forwarded field information
>> [1])
>> - fully sort the data
>> - invoke group reducer for each group
>>
>> In this plan, the only spilling should happen in the sort for the final
>> aggregation.
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/batch/index.html#semantic-annotations
>>
>>
>>
>>
>> 2017-09-05 22:21 GMT+02:00 Newport, Billy <Bi...@gs.com>:
>>
>>> We have the same issue. We are finding that we cannot express the data
>>> flow in a natural way because of unnecessary spilling. Instead, we're
>>> making our own operators which combine multiple steps together and
>>> essentially hide it from flink OR sometimes we even have to read an input
>>> dataset once per flow to avoid spilling. The performance improvements are
>>> dramatic but it's kind of reducing  flink to a thread scheduler rather than
>>> a data flow engine because we basically cannot express the flow to flink.
>>> This worries us because if we let others write flink code using our infra,
>>> we'll be spending all our time collapsing their flows into much simpler but
>>> less intuititve flows to prevent flink from spilling.
>>>
>>> This also means higher level APIs such as the table API or Beam are off
>>> the table because they prevent us optimizing in this manner.
>>>
>>> We already have prior implementations of the logic we are implementing
>>> in flink and as a result, we know it's much less efficient than the prior
>>> implementations which is giving us pause for rolling it out more broadly,
>>> we're afraid of the flink tax in effect from a performance point of view as
>>> well as from a usability point of view given naïve flows are not performant
>>> without significant collapsing.
>>>
>>> For example, we see spilling here:
>>>
>>>         Dataset -> Map > Filter -> Map -> Output
>>>
>>> We're trying to combine the Map ->Output into the filter operation now
>>> to write the records which are not passed through to an output file during
>>> the Filter.
>>>
>>>
>>> Or in this case
>>>
>>>         Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map -> Output
>>>
>>> Rewriting as
>>>
>>>         Dataset -> Map -> FilterT -> CoGroup > Map -> Output
>>>         Dataset -> Map -> FilterF -> Map -> Output
>>>
>>> That is two separate flows is multiples faster. That is, reading the
>>> file twice rather than once.
>>>
>>> This is all pretty unintuitive and makes using Flink pretty difficult
>>> for us never mind our users. Writing the flink dataflows in a naïve way is
>>> fast but getting it to run with acceptable efficiency results in obscure
>>> workarounds and collapsing and takes the bulk of the time for us which is a
>>> shame and the main reason, we don't want to push it out for general use yet.
>>>
>>> It seems like it badly needs a flow rewriter which is capable of
>>> rewriting a naïve flow to use operators or restructured flows
>>> automatically. We're doing it by hand right now but there has to be a
>>> better way.
>>>
>>> It's a shame really, it's so close.
>>>
>>> Billy
>>>
>>>
>>> -----Original Message-----
>>> From: Urs Schoenenberger [mailto:urs.schoenenberger@tngtech.com]
>>> Sent: Tuesday, September 05, 2017 6:30 AM
>>> To: user
>>> Subject: DataSet: partitionByHash without materializing/spilling the
>>> entire partition?
>>>
>>> Hi all,
>>>
>>> we have a DataSet pipeline which reads CSV input data and then
>>> essentially does a combinable GroupReduce via first(n).
>>>
>>> In our first iteration (readCsvFile -> groupBy(0) -> sortGroup(0) ->
>>> first(n)), we got a jobgraph like this:
>>>
>>> source --[Forward]--> combine --[Hash Partition on 0, Sort]--> reduce
>>>
>>> This works, but we found the combine phase to be inefficient because not
>>> enough combinable elements fit into a sorter. My idea was to
>>> pre-partition the DataSet to increase the chance of combinable elements
>>> (readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) ->
>>> first(n)).
>>>
>>> To my surprise, I found that this changed the job graph to
>>>
>>> source --[Hash Partition on 0]--> partition(noop) --[Forward]--> combine
>>> --[Hash Partition on 0, Sort]--> reduce
>>>
>>> while materializing and spilling the entire partitions at the
>>> partition(noop)-Operator!
>>>
>>> Is there any way I can partition the data on the way from source to
>>> combine without spilling? That is, can I get a job graph that looks like
>>>
>>>
>>> source --[Hash Partition on 0]--> combine --[Hash Partition on 0,
>>> Sort]--> reduce
>>>
>>> instead?
>>>
>>> Thanks,
>>> Urs
>>>
>>> --
>>> Urs Schönenberger - urs.schoenenberger@tngtech.com
>>>
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>
>>
>>
>

Re: DataSet: partitionByHash without materializing/spilling the entire partition?

Posted by Fabian Hueske <fh...@gmail.com>.
btw. not sure if you know that you can visualize the JSON plan returned by
ExecutionEnvironment.getExecutionPlan() on the website [1].

Best, Fabian

[1] http://flink.apache.org/visualizer/


2017-09-06 14:39 GMT+02:00 Fabian Hueske <fh...@gmail.com>:

> Hi Urs,
>
> a hash-partition operator should not spill. In general, DataSet plans aim
> to be as much pipelined as possible.
> There are a few cases when spilling happens:
>
> - full sort with not sufficient memory
> - hash-tables that need to spill (only in join operators)
> - range partitioning to compute a histogram of the partitioning keys.
> - temp nodes to avoid deadlocks. These can occur in plans that branch and
> join later like the following:
>
>               /--- Map ---\
> Input --<                   JOIN --- Output
>               \--- Map ---/
>
>
> A simple plan without branching with as the one you posted
>    readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) ->
> first(n)
> has no reason to spill except for the full sort that is required for the
> final aggregation.
>
> Can you share the execution plan that you get of the plan
> (ExecutionEnvironment.getExecutionPlan())?
>
> Btw, the sortGroup(0) call is superfluous because it would sort a group
> where all 0-fields are the same on the 0-field.
> I believe Flink's optimizer automatically removes that so it does not
> impact the performance.
> Sorting on another field would indeed make sense, because this would
> determine order within a group and hence the records which are forwarded by
> First(n).
>
> In order to force a combiner on a partitioned data set, you can do the
> following:
>
> --------
>
> public static void main(String[] args) throws Exception {
>
>    ExecutionEnvironment env = ExecutionEnvironment.
> getExecutionEnvironment();
>
>    DataSet<Tuple2<Long, Long>> data = randData(env);
>
>    DataSet<Tuple2<Long, Long>> result = data.partitionByHash(0)
>       .groupBy(0).combineGroup(new First3())
>          .withForwardedFields("f0")
>       .groupBy(0).reduceGroup(new First3());
>
>    result.print();
> }
>
> public static class First3 implements
>    GroupCombineFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>,
>    GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
>
>    @Override
>    public void combine(Iterable<Tuple2<Long, Long>> values,
> Collector<Tuple2<Long, Long>> out) throws Exception {
>       reduce(values, out);
>    }
>
>    @Override
>    public void reduce(Iterable<Tuple2<Long, Long>> values,
> Collector<Tuple2<Long, Long>> out) throws Exception {
>       int i = 0;
>       for (Tuple2<Long, Long> v : values) {
>          out.collect(v);
>          i++;
>          if (i == 3) {
>             break;
>          }
>       }
>    }
> }
>
> --------
>
> The generated plan will
> - hash partition the input data
> - partially sort the data in memory on the first field (not going to disk)
> - invoke the combiner for each in-memory sorted group
> - locally forward the data (because of the forwarded field information [1])
> - fully sort the data
> - invoke group reducer for each group
>
> In this plan, the only spilling should happen in the sort for the final
> aggregation.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/batch/index.html#semantic-annotations
>
>
>
>
> 2017-09-05 22:21 GMT+02:00 Newport, Billy <Bi...@gs.com>:
>
>> We have the same issue. We are finding that we cannot express the data
>> flow in a natural way because of unnecessary spilling. Instead, we're
>> making our own operators which combine multiple steps together and
>> essentially hide it from flink OR sometimes we even have to read an input
>> dataset once per flow to avoid spilling. The performance improvements are
>> dramatic but it's kind of reducing  flink to a thread scheduler rather than
>> a data flow engine because we basically cannot express the flow to flink.
>> This worries us because if we let others write flink code using our infra,
>> we'll be spending all our time collapsing their flows into much simpler but
>> less intuititve flows to prevent flink from spilling.
>>
>> This also means higher level APIs such as the table API or Beam are off
>> the table because they prevent us optimizing in this manner.
>>
>> We already have prior implementations of the logic we are implementing in
>> flink and as a result, we know it's much less efficient than the prior
>> implementations which is giving us pause for rolling it out more broadly,
>> we're afraid of the flink tax in effect from a performance point of view as
>> well as from a usability point of view given naïve flows are not performant
>> without significant collapsing.
>>
>> For example, we see spilling here:
>>
>>         Dataset -> Map > Filter -> Map -> Output
>>
>> We're trying to combine the Map ->Output into the filter operation now to
>> write the records which are not passed through to an output file during the
>> Filter.
>>
>>
>> Or in this case
>>
>>         Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map -> Output
>>
>> Rewriting as
>>
>>         Dataset -> Map -> FilterT -> CoGroup > Map -> Output
>>         Dataset -> Map -> FilterF -> Map -> Output
>>
>> That is two separate flows is multiples faster. That is, reading the file
>> twice rather than once.
>>
>> This is all pretty unintuitive and makes using Flink pretty difficult for
>> us never mind our users. Writing the flink dataflows in a naïve way is fast
>> but getting it to run with acceptable efficiency results in obscure
>> workarounds and collapsing and takes the bulk of the time for us which is a
>> shame and the main reason, we don't want to push it out for general use yet.
>>
>> It seems like it badly needs a flow rewriter which is capable of
>> rewriting a naïve flow to use operators or restructured flows
>> automatically. We're doing it by hand right now but there has to be a
>> better way.
>>
>> It's a shame really, it's so close.
>>
>> Billy
>>
>>
>> -----Original Message-----
>> From: Urs Schoenenberger [mailto:urs.schoenenberger@tngtech.com]
>> Sent: Tuesday, September 05, 2017 6:30 AM
>> To: user
>> Subject: DataSet: partitionByHash without materializing/spilling the
>> entire partition?
>>
>> Hi all,
>>
>> we have a DataSet pipeline which reads CSV input data and then
>> essentially does a combinable GroupReduce via first(n).
>>
>> In our first iteration (readCsvFile -> groupBy(0) -> sortGroup(0) ->
>> first(n)), we got a jobgraph like this:
>>
>> source --[Forward]--> combine --[Hash Partition on 0, Sort]--> reduce
>>
>> This works, but we found the combine phase to be inefficient because not
>> enough combinable elements fit into a sorter. My idea was to
>> pre-partition the DataSet to increase the chance of combinable elements
>> (readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) ->
>> first(n)).
>>
>> To my surprise, I found that this changed the job graph to
>>
>> source --[Hash Partition on 0]--> partition(noop) --[Forward]--> combine
>> --[Hash Partition on 0, Sort]--> reduce
>>
>> while materializing and spilling the entire partitions at the
>> partition(noop)-Operator!
>>
>> Is there any way I can partition the data on the way from source to
>> combine without spilling? That is, can I get a job graph that looks like
>>
>>
>> source --[Hash Partition on 0]--> combine --[Hash Partition on 0,
>> Sort]--> reduce
>>
>> instead?
>>
>> Thanks,
>> Urs
>>
>> --
>> Urs Schönenberger - urs.schoenenberger@tngtech.com
>>
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>
>

Re: DataSet: partitionByHash without materializing/spilling the entire partition?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Urs,

thanks for the additional details and plans you provided.
Regarding your questions:

Q1: Does the moved sort affect the spilling behavior?
Yes, it might. If you look at the the out-output.png plan, you'll notice
that is says "Sort (combining)" whereas the two-output.png plan only says
"Sort". The difference is that "Sort (combining)" applies a combiner before
the data is spilled to disk. So this might decrease the number of spilled
data. To be honest, I don't know why the optimizer moves the sort when a
second output is added. This would require a bit of optimizer debugging...

Q2: Is the given program correct?
Yes, this looks good.
However, I thought about this again and I'm not sure if it makes sense to
use a dedicated combiner in your case.
A combiner is usually used to reduce the amount of data that is shuffled
over the network at cost of a partial sort. In your case, there are not
enough duplicates to significantly reduce the amount of data. Hence, you'd
like to apply the reducer after the shuffle (partitioning) which "only"
reduces the amount of data to spill during sorting. However, this is
already done by the "Sort (combining)" local strategy. By adding a
dedicated combiner with ".groupCombine()" we add a second partial sort +
combine phase. The first combiner is the combine operator and the second
combiner is the one applied during the full sort for the reduce. Depending
on the number of duplicates, the second combiner might further reduce the
amount of data, but it also raises the question how much benefit of the
first combiner actually provides.

So, I would run a small benchmark to check if the additional combiner
actually reduces the runtime. The combiner in "Sort (combining)" might
actually suffice.

Best, Fabian


2017-09-10 15:25 GMT+02:00 Urs Schoenenberger <
urs.schoenenberger@tngtech.com>:

> Hi Fabian,
>
> thanks a lot for your thorough reply. It turns out that I was
> mostly confused by the generated plans and there does not seem
> to be "unneccessary" spilling at the operator level, but rather
> an omission of the combiner (this is the same as FLINK-3179,
> right?), for which you provided a good workaround!
>
> In case anyone is wondering what was really going on in my case:
>
> The example I gave in my first message is actually
> not enough to reproduce the behaviour I'm describing - sorry!
>
> The reproducible setup should start as follows:
>
> DataSet<Tuple2<String, Integer>> in = env
>     .readCsvFile("/tmp/test").types(String.class, Integer.class);
> in.partitionByHash(0)
>     .groupBy(0)
>     .sortGroup(1, Order.ASCENDING)
>     .first(3)
>     .output(new DiscardingOutputFormat<>());
>
>
> (the sorting on the non-key field obviously complicates matters.)
>
> This produces the plan attached as one-output.png/one-output.json,
> and predictably spills at the reducer when executed since it requires
> a full sort.
>
> The "unexpected" spilling happened as soon as I added a second sink:
>
> in.output(new DiscardingOutputFormat<>());
>
> Now, the plan changes to the one shown at two-outputs.png/
> two-outputs.json. It now spills in front of the partition operator,
> but I was missing that this is because the optimizer moved the
> sort in front of the partitioner instead! There's no real difference
> in terms of the spilling/computation behaviour, right? So it's all
> good so far, we're not really spilling because the no-op partitioner,
> but rather moving where the sort happens.
>
> Just to make sure, the workaround for applying the combiner should
> look like this for my case, right?
>
>         in.partitionByHash(0)
>                 .groupBy(0)
>                 .sortGroup(1, Order.ASCENDING)
>                 .combineGroup(new FirstReducer<>(3))
>                 .withForwardedFields("f0")
>                 .groupBy(0)
>                 .sortGroup(1, Order.ASCENDING)
>                 .reduceGroup(new FirstReducer<>(3))
>
>
> Again, thank you very much for your support!
>
> Best,
> Urs
>
> --
> Urs Schönenberger - urs.schoenenberger@tngtech.com
>
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
> ----- Ursprüngliche Mail -----
> Von: "Fabian Hueske" <fh...@gmail.com>
> An: "Newport, Billy" <Bi...@gs.com>
> CC: "Urs Schönenberger" <ur...@tngtech.com>, "user" <
> user@flink.apache.org>
> Gesendet: Mittwoch, 6. September 2017 14:39:23
> Betreff: Re: DataSet: partitionByHash without materializing/spilling the
> entire partition?
>
> Hi Urs,
>
> a hash-partition operator should not spill. In general, DataSet plans aim
> to be as much pipelined as possible.
> There are a few cases when spilling happens:
>
> - full sort with not sufficient memory
> - hash-tables that need to spill (only in join operators)
> - range partitioning to compute a histogram of the partitioning keys.
> - temp nodes to avoid deadlocks. These can occur in plans that branch and
> join later like the following:
>
>               /--- Map ---\
> Input --<                   JOIN --- Output
>               \--- Map ---/
>
>
> A simple plan without branching with as the one you posted
>    readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) -> first(n)
> has no reason to spill except for the full sort that is required for the
> final aggregation.
>
> Can you share the execution plan that you get of the plan
> (ExecutionEnvironment.getExecutionPlan())?
>
> Btw, the sortGroup(0) call is superfluous because it would sort a group
> where all 0-fields are the same on the 0-field.
> I believe Flink's optimizer automatically removes that so it does not
> impact the performance.
> Sorting on another field would indeed make sense, because this would
> determine order within a group and hence the records which are forwarded by
> First(n).
>
> In order to force a combiner on a partitioned data set, you can do the
> following:
>
> --------
>
> public static void main(String[] args) throws Exception {
>
>    ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>    DataSet<Tuple2<Long, Long>> data = randData(env);
>
>    DataSet<Tuple2<Long, Long>> result = data.partitionByHash(0)
>       .groupBy(0).combineGroup(new First3())
>          .withForwardedFields("f0")
>       .groupBy(0).reduceGroup(new First3());
>
>    result.print();
> }
>
> public static class First3 implements
>    GroupCombineFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>,
>    GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
>
>    @Override
>    public void combine(Iterable<Tuple2<Long, Long>> values,
> Collector<Tuple2<Long, Long>> out) throws Exception {
>       reduce(values, out);
>    }
>
>    @Override
>    public void reduce(Iterable<Tuple2<Long, Long>> values,
> Collector<Tuple2<Long, Long>> out) throws Exception {
>       int i = 0;
>       for (Tuple2<Long, Long> v : values) {
>          out.collect(v);
>          i++;
>          if (i == 3) {
>             break;
>          }
>       }
>    }
> }
>
> --------
>
> The generated plan will
> - hash partition the input data
> - partially sort the data in memory on the first field (not going to disk)
> - invoke the combiner for each in-memory sorted group
> - locally forward the data (because of the forwarded field information [1])
> - fully sort the data
> - invoke group reducer for each group
>
> In this plan, the only spilling should happen in the sort for the final
> aggregation.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/batch/index.html#semantic-annotations
>
>
>
> 2017-09-05 22:21 GMT+02:00 Newport, Billy <Bi...@gs.com>:
>
> > We have the same issue. We are finding that we cannot express the data
> > flow in a natural way because of unnecessary spilling. Instead, we're
> > making our own operators which combine multiple steps together and
> > essentially hide it from flink OR sometimes we even have to read an input
> > dataset once per flow to avoid spilling. The performance improvements are
> > dramatic but it's kind of reducing  flink to a thread scheduler rather
> than
> > a data flow engine because we basically cannot express the flow to flink.
> > This worries us because if we let others write flink code using our
> infra,
> > we'll be spending all our time collapsing their flows into much simpler
> but
> > less intuititve flows to prevent flink from spilling.
> >
> > This also means higher level APIs such as the table API or Beam are off
> > the table because they prevent us optimizing in this manner.
> >
> > We already have prior implementations of the logic we are implementing in
> > flink and as a result, we know it's much less efficient than the prior
> > implementations which is giving us pause for rolling it out more broadly,
> > we're afraid of the flink tax in effect from a performance point of view
> as
> > well as from a usability point of view given naïve flows are not
> performant
> > without significant collapsing.
> >
> > For example, we see spilling here:
> >
> >         Dataset -> Map > Filter -> Map -> Output
> >
> > We're trying to combine the Map ->Output into the filter operation now to
> > write the records which are not passed through to an output file during
> the
> > Filter.
> >
> >
> > Or in this case
> >
> >         Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map -> Output
> >
> > Rewriting as
> >
> >         Dataset -> Map -> FilterT -> CoGroup > Map -> Output
> >         Dataset -> Map -> FilterF -> Map -> Output
> >
> > That is two separate flows is multiples faster. That is, reading the file
> > twice rather than once.
> >
> > This is all pretty unintuitive and makes using Flink pretty difficult for
> > us never mind our users. Writing the flink dataflows in a naïve way is
> fast
> > but getting it to run with acceptable efficiency results in obscure
> > workarounds and collapsing and takes the bulk of the time for us which
> is a
> > shame and the main reason, we don't want to push it out for general use
> yet.
> >
> > It seems like it badly needs a flow rewriter which is capable of
> rewriting
> > a naïve flow to use operators or restructured flows automatically. We're
> > doing it by hand right now but there has to be a better way.
> >
> > It's a shame really, it's so close.
> >
> > Billy
> >
> >
> > -----Original Message-----
> > From: Urs Schoenenberger [mailto:urs.schoenenberger@tngtech.com]
> > Sent: Tuesday, September 05, 2017 6:30 AM
> > To: user
> > Subject: DataSet: partitionByHash without materializing/spilling the
> > entire partition?
> >
> > Hi all,
> >
> > we have a DataSet pipeline which reads CSV input data and then
> > essentially does a combinable GroupReduce via first(n).
> >
> > In our first iteration (readCsvFile -> groupBy(0) -> sortGroup(0) ->
> > first(n)), we got a jobgraph like this:
> >
> > source --[Forward]--> combine --[Hash Partition on 0, Sort]--> reduce
> >
> > This works, but we found the combine phase to be inefficient because not
> > enough combinable elements fit into a sorter. My idea was to
> > pre-partition the DataSet to increase the chance of combinable elements
> > (readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) ->
> first(n)).
> >
> > To my surprise, I found that this changed the job graph to
> >
> > source --[Hash Partition on 0]--> partition(noop) --[Forward]--> combine
> > --[Hash Partition on 0, Sort]--> reduce
> >
> > while materializing and spilling the entire partitions at the
> > partition(noop)-Operator!
> >
> > Is there any way I can partition the data on the way from source to
> > combine without spilling? That is, can I get a job graph that looks like
> >
> >
> > source --[Hash Partition on 0]--> combine --[Hash Partition on 0,
> > Sort]--> reduce
> >
> > instead?
> >
> > Thanks,
> > Urs
> >
> > --
> > Urs Schönenberger - urs.schoenenberger@tngtech.com
> >
> > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> > Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
> > Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >
>

Re: DataSet: partitionByHash without materializing/spilling the entire partition?

Posted by Urs Schoenenberger <ur...@tngtech.com>.
Hi Fabian,

thanks a lot for your thorough reply. It turns out that I was
mostly confused by the generated plans and there does not seem
to be "unneccessary" spilling at the operator level, but rather
an omission of the combiner (this is the same as FLINK-3179,
right?), for which you provided a good workaround!

In case anyone is wondering what was really going on in my case:

The example I gave in my first message is actually
not enough to reproduce the behaviour I'm describing - sorry!

The reproducible setup should start as follows:

DataSet<Tuple2<String, Integer>> in = env
    .readCsvFile("/tmp/test").types(String.class, Integer.class);
in.partitionByHash(0)
    .groupBy(0)
    .sortGroup(1, Order.ASCENDING)
    .first(3)
    .output(new DiscardingOutputFormat<>());


(the sorting on the non-key field obviously complicates matters.)

This produces the plan attached as one-output.png/one-output.json,
and predictably spills at the reducer when executed since it requires
a full sort.

The "unexpected" spilling happened as soon as I added a second sink:

in.output(new DiscardingOutputFormat<>());

Now, the plan changes to the one shown at two-outputs.png/
two-outputs.json. It now spills in front of the partition operator,
but I was missing that this is because the optimizer moved the
sort in front of the partitioner instead! There's no real difference
in terms of the spilling/computation behaviour, right? So it's all
good so far, we're not really spilling because the no-op partitioner,
but rather moving where the sort happens.

Just to make sure, the workaround for applying the combiner should
look like this for my case, right?

        in.partitionByHash(0)
                .groupBy(0)
                .sortGroup(1, Order.ASCENDING)
                .combineGroup(new FirstReducer<>(3))
                .withForwardedFields("f0")
                .groupBy(0)
                .sortGroup(1, Order.ASCENDING)
                .reduceGroup(new FirstReducer<>(3))


Again, thank you very much for your support!

Best,
Urs

--
Urs Schönenberger - urs.schoenenberger@tngtech.com

TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082

----- Ursprüngliche Mail -----
Von: "Fabian Hueske" <fh...@gmail.com>
An: "Newport, Billy" <Bi...@gs.com>
CC: "Urs Schönenberger" <ur...@tngtech.com>, "user" <us...@flink.apache.org>
Gesendet: Mittwoch, 6. September 2017 14:39:23
Betreff: Re: DataSet: partitionByHash without materializing/spilling the entire partition?

Hi Urs,

a hash-partition operator should not spill. In general, DataSet plans aim
to be as much pipelined as possible.
There are a few cases when spilling happens:

- full sort with not sufficient memory
- hash-tables that need to spill (only in join operators)
- range partitioning to compute a histogram of the partitioning keys.
- temp nodes to avoid deadlocks. These can occur in plans that branch and
join later like the following:

              /--- Map ---\
Input --<                   JOIN --- Output
              \--- Map ---/


A simple plan without branching with as the one you posted
   readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) -> first(n)
has no reason to spill except for the full sort that is required for the
final aggregation.

Can you share the execution plan that you get of the plan
(ExecutionEnvironment.getExecutionPlan())?

Btw, the sortGroup(0) call is superfluous because it would sort a group
where all 0-fields are the same on the 0-field.
I believe Flink's optimizer automatically removes that so it does not
impact the performance.
Sorting on another field would indeed make sense, because this would
determine order within a group and hence the records which are forwarded by
First(n).

In order to force a combiner on a partitioned data set, you can do the
following:

--------

public static void main(String[] args) throws Exception {

   ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

   DataSet<Tuple2<Long, Long>> data = randData(env);

   DataSet<Tuple2<Long, Long>> result = data.partitionByHash(0)
      .groupBy(0).combineGroup(new First3())
         .withForwardedFields("f0")
      .groupBy(0).reduceGroup(new First3());

   result.print();
}

public static class First3 implements
   GroupCombineFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>,
   GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

   @Override
   public void combine(Iterable<Tuple2<Long, Long>> values,
Collector<Tuple2<Long, Long>> out) throws Exception {
      reduce(values, out);
   }

   @Override
   public void reduce(Iterable<Tuple2<Long, Long>> values,
Collector<Tuple2<Long, Long>> out) throws Exception {
      int i = 0;
      for (Tuple2<Long, Long> v : values) {
         out.collect(v);
         i++;
         if (i == 3) {
            break;
         }
      }
   }
}

--------

The generated plan will
- hash partition the input data
- partially sort the data in memory on the first field (not going to disk)
- invoke the combiner for each in-memory sorted group
- locally forward the data (because of the forwarded field information [1])
- fully sort the data
- invoke group reducer for each group

In this plan, the only spilling should happen in the sort for the final
aggregation.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#semantic-annotations



2017-09-05 22:21 GMT+02:00 Newport, Billy <Bi...@gs.com>:

> We have the same issue. We are finding that we cannot express the data
> flow in a natural way because of unnecessary spilling. Instead, we're
> making our own operators which combine multiple steps together and
> essentially hide it from flink OR sometimes we even have to read an input
> dataset once per flow to avoid spilling. The performance improvements are
> dramatic but it's kind of reducing  flink to a thread scheduler rather than
> a data flow engine because we basically cannot express the flow to flink.
> This worries us because if we let others write flink code using our infra,
> we'll be spending all our time collapsing their flows into much simpler but
> less intuititve flows to prevent flink from spilling.
>
> This also means higher level APIs such as the table API or Beam are off
> the table because they prevent us optimizing in this manner.
>
> We already have prior implementations of the logic we are implementing in
> flink and as a result, we know it's much less efficient than the prior
> implementations which is giving us pause for rolling it out more broadly,
> we're afraid of the flink tax in effect from a performance point of view as
> well as from a usability point of view given naïve flows are not performant
> without significant collapsing.
>
> For example, we see spilling here:
>
>         Dataset -> Map > Filter -> Map -> Output
>
> We're trying to combine the Map ->Output into the filter operation now to
> write the records which are not passed through to an output file during the
> Filter.
>
>
> Or in this case
>
>         Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map -> Output
>
> Rewriting as
>
>         Dataset -> Map -> FilterT -> CoGroup > Map -> Output
>         Dataset -> Map -> FilterF -> Map -> Output
>
> That is two separate flows is multiples faster. That is, reading the file
> twice rather than once.
>
> This is all pretty unintuitive and makes using Flink pretty difficult for
> us never mind our users. Writing the flink dataflows in a naïve way is fast
> but getting it to run with acceptable efficiency results in obscure
> workarounds and collapsing and takes the bulk of the time for us which is a
> shame and the main reason, we don't want to push it out for general use yet.
>
> It seems like it badly needs a flow rewriter which is capable of rewriting
> a naïve flow to use operators or restructured flows automatically. We're
> doing it by hand right now but there has to be a better way.
>
> It's a shame really, it's so close.
>
> Billy
>
>
> -----Original Message-----
> From: Urs Schoenenberger [mailto:urs.schoenenberger@tngtech.com]
> Sent: Tuesday, September 05, 2017 6:30 AM
> To: user
> Subject: DataSet: partitionByHash without materializing/spilling the
> entire partition?
>
> Hi all,
>
> we have a DataSet pipeline which reads CSV input data and then
> essentially does a combinable GroupReduce via first(n).
>
> In our first iteration (readCsvFile -> groupBy(0) -> sortGroup(0) ->
> first(n)), we got a jobgraph like this:
>
> source --[Forward]--> combine --[Hash Partition on 0, Sort]--> reduce
>
> This works, but we found the combine phase to be inefficient because not
> enough combinable elements fit into a sorter. My idea was to
> pre-partition the DataSet to increase the chance of combinable elements
> (readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) -> first(n)).
>
> To my surprise, I found that this changed the job graph to
>
> source --[Hash Partition on 0]--> partition(noop) --[Forward]--> combine
> --[Hash Partition on 0, Sort]--> reduce
>
> while materializing and spilling the entire partitions at the
> partition(noop)-Operator!
>
> Is there any way I can partition the data on the way from source to
> combine without spilling? That is, can I get a job graph that looks like
>
>
> source --[Hash Partition on 0]--> combine --[Hash Partition on 0,
> Sort]--> reduce
>
> instead?
>
> Thanks,
> Urs
>
> --
> Urs Schönenberger - urs.schoenenberger@tngtech.com
>
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>

Re: DataSet: partitionByHash without materializing/spilling the entire partition?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Urs,

a hash-partition operator should not spill. In general, DataSet plans aim
to be as much pipelined as possible.
There are a few cases when spilling happens:

- full sort with not sufficient memory
- hash-tables that need to spill (only in join operators)
- range partitioning to compute a histogram of the partitioning keys.
- temp nodes to avoid deadlocks. These can occur in plans that branch and
join later like the following:

              /--- Map ---\
Input --<                   JOIN --- Output
              \--- Map ---/


A simple plan without branching with as the one you posted
   readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) -> first(n)
has no reason to spill except for the full sort that is required for the
final aggregation.

Can you share the execution plan that you get of the plan
(ExecutionEnvironment.getExecutionPlan())?

Btw, the sortGroup(0) call is superfluous because it would sort a group
where all 0-fields are the same on the 0-field.
I believe Flink's optimizer automatically removes that so it does not
impact the performance.
Sorting on another field would indeed make sense, because this would
determine order within a group and hence the records which are forwarded by
First(n).

In order to force a combiner on a partitioned data set, you can do the
following:

--------

public static void main(String[] args) throws Exception {

   ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

   DataSet<Tuple2<Long, Long>> data = randData(env);

   DataSet<Tuple2<Long, Long>> result = data.partitionByHash(0)
      .groupBy(0).combineGroup(new First3())
         .withForwardedFields("f0")
      .groupBy(0).reduceGroup(new First3());

   result.print();
}

public static class First3 implements
   GroupCombineFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>,
   GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

   @Override
   public void combine(Iterable<Tuple2<Long, Long>> values,
Collector<Tuple2<Long, Long>> out) throws Exception {
      reduce(values, out);
   }

   @Override
   public void reduce(Iterable<Tuple2<Long, Long>> values,
Collector<Tuple2<Long, Long>> out) throws Exception {
      int i = 0;
      for (Tuple2<Long, Long> v : values) {
         out.collect(v);
         i++;
         if (i == 3) {
            break;
         }
      }
   }
}

--------

The generated plan will
- hash partition the input data
- partially sort the data in memory on the first field (not going to disk)
- invoke the combiner for each in-memory sorted group
- locally forward the data (because of the forwarded field information [1])
- fully sort the data
- invoke group reducer for each group

In this plan, the only spilling should happen in the sort for the final
aggregation.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#semantic-annotations



2017-09-05 22:21 GMT+02:00 Newport, Billy <Bi...@gs.com>:

> We have the same issue. We are finding that we cannot express the data
> flow in a natural way because of unnecessary spilling. Instead, we're
> making our own operators which combine multiple steps together and
> essentially hide it from flink OR sometimes we even have to read an input
> dataset once per flow to avoid spilling. The performance improvements are
> dramatic but it's kind of reducing  flink to a thread scheduler rather than
> a data flow engine because we basically cannot express the flow to flink.
> This worries us because if we let others write flink code using our infra,
> we'll be spending all our time collapsing their flows into much simpler but
> less intuititve flows to prevent flink from spilling.
>
> This also means higher level APIs such as the table API or Beam are off
> the table because they prevent us optimizing in this manner.
>
> We already have prior implementations of the logic we are implementing in
> flink and as a result, we know it's much less efficient than the prior
> implementations which is giving us pause for rolling it out more broadly,
> we're afraid of the flink tax in effect from a performance point of view as
> well as from a usability point of view given naïve flows are not performant
> without significant collapsing.
>
> For example, we see spilling here:
>
>         Dataset -> Map > Filter -> Map -> Output
>
> We're trying to combine the Map ->Output into the filter operation now to
> write the records which are not passed through to an output file during the
> Filter.
>
>
> Or in this case
>
>         Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map -> Output
>
> Rewriting as
>
>         Dataset -> Map -> FilterT -> CoGroup > Map -> Output
>         Dataset -> Map -> FilterF -> Map -> Output
>
> That is two separate flows is multiples faster. That is, reading the file
> twice rather than once.
>
> This is all pretty unintuitive and makes using Flink pretty difficult for
> us never mind our users. Writing the flink dataflows in a naïve way is fast
> but getting it to run with acceptable efficiency results in obscure
> workarounds and collapsing and takes the bulk of the time for us which is a
> shame and the main reason, we don't want to push it out for general use yet.
>
> It seems like it badly needs a flow rewriter which is capable of rewriting
> a naïve flow to use operators or restructured flows automatically. We're
> doing it by hand right now but there has to be a better way.
>
> It's a shame really, it's so close.
>
> Billy
>
>
> -----Original Message-----
> From: Urs Schoenenberger [mailto:urs.schoenenberger@tngtech.com]
> Sent: Tuesday, September 05, 2017 6:30 AM
> To: user
> Subject: DataSet: partitionByHash without materializing/spilling the
> entire partition?
>
> Hi all,
>
> we have a DataSet pipeline which reads CSV input data and then
> essentially does a combinable GroupReduce via first(n).
>
> In our first iteration (readCsvFile -> groupBy(0) -> sortGroup(0) ->
> first(n)), we got a jobgraph like this:
>
> source --[Forward]--> combine --[Hash Partition on 0, Sort]--> reduce
>
> This works, but we found the combine phase to be inefficient because not
> enough combinable elements fit into a sorter. My idea was to
> pre-partition the DataSet to increase the chance of combinable elements
> (readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) -> first(n)).
>
> To my surprise, I found that this changed the job graph to
>
> source --[Hash Partition on 0]--> partition(noop) --[Forward]--> combine
> --[Hash Partition on 0, Sort]--> reduce
>
> while materializing and spilling the entire partitions at the
> partition(noop)-Operator!
>
> Is there any way I can partition the data on the way from source to
> combine without spilling? That is, can I get a job graph that looks like
>
>
> source --[Hash Partition on 0]--> combine --[Hash Partition on 0,
> Sort]--> reduce
>
> instead?
>
> Thanks,
> Urs
>
> --
> Urs Schönenberger - urs.schoenenberger@tngtech.com
>
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>

RE: DataSet: partitionByHash without materializing/spilling the entire partition?

Posted by "Newport, Billy" <Bi...@gs.com>.
We have the same issue. We are finding that we cannot express the data flow in a natural way because of unnecessary spilling. Instead, we're making our own operators which combine multiple steps together and essentially hide it from flink OR sometimes we even have to read an input dataset once per flow to avoid spilling. The performance improvements are dramatic but it's kind of reducing  flink to a thread scheduler rather than a data flow engine because we basically cannot express the flow to flink. This worries us because if we let others write flink code using our infra, we'll be spending all our time collapsing their flows into much simpler but less intuititve flows to prevent flink from spilling. 

This also means higher level APIs such as the table API or Beam are off the table because they prevent us optimizing in this manner.

We already have prior implementations of the logic we are implementing in flink and as a result, we know it's much less efficient than the prior implementations which is giving us pause for rolling it out more broadly, we're afraid of the flink tax in effect from a performance point of view as well as from a usability point of view given naïve flows are not performant without significant collapsing.

For example, we see spilling here:

	Dataset -> Map > Filter -> Map -> Output

We're trying to combine the Map ->Output into the filter operation now to write the records which are not passed through to an output file during the Filter.


Or in this case

	Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map -> Output

Rewriting as

	Dataset -> Map -> FilterT -> CoGroup > Map -> Output
	Dataset -> Map -> FilterF -> Map -> Output

That is two separate flows is multiples faster. That is, reading the file twice rather than once.

This is all pretty unintuitive and makes using Flink pretty difficult for us never mind our users. Writing the flink dataflows in a naïve way is fast but getting it to run with acceptable efficiency results in obscure workarounds and collapsing and takes the bulk of the time for us which is a shame and the main reason, we don't want to push it out for general use yet.

It seems like it badly needs a flow rewriter which is capable of rewriting a naïve flow to use operators or restructured flows automatically. We're doing it by hand right now but there has to be a better way.

It's a shame really, it's so close.

Billy


-----Original Message-----
From: Urs Schoenenberger [mailto:urs.schoenenberger@tngtech.com] 
Sent: Tuesday, September 05, 2017 6:30 AM
To: user
Subject: DataSet: partitionByHash without materializing/spilling the entire partition?

Hi all,

we have a DataSet pipeline which reads CSV input data and then
essentially does a combinable GroupReduce via first(n).

In our first iteration (readCsvFile -> groupBy(0) -> sortGroup(0) ->
first(n)), we got a jobgraph like this:

source --[Forward]--> combine --[Hash Partition on 0, Sort]--> reduce

This works, but we found the combine phase to be inefficient because not
enough combinable elements fit into a sorter. My idea was to
pre-partition the DataSet to increase the chance of combinable elements
(readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) -> first(n)).

To my surprise, I found that this changed the job graph to

source --[Hash Partition on 0]--> partition(noop) --[Forward]--> combine
--[Hash Partition on 0, Sort]--> reduce

while materializing and spilling the entire partitions at the
partition(noop)-Operator!

Is there any way I can partition the data on the way from source to
combine without spilling? That is, can I get a job graph that looks like


source --[Hash Partition on 0]--> combine --[Hash Partition on 0,
Sort]--> reduce

instead?

Thanks,
Urs

-- 
Urs Schönenberger - urs.schoenenberger@tngtech.com

TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082