You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Andrey Gusev <an...@siftscience.com> on 2015/12/10 03:05:15 UTC

Secondary sort and partitioning in Spark

Hello crunch!

I am running into problems with partitioning of groups with secondary sort
running on SparkPipeline.

What I am observing is that records belonging to a single group may be
split across two or more calls to apply DoFn. This could be a gap in my
understanding of Spark execution model wrt to locality - and if so, can
*all* the records belonging to a groupBy key be forced to a single call?

Roughly speaking the code looks like this:

PTableType<GroupByKey, Pair<SortKey, Info>> pType =
tableOf(Writables.writables(GroupByKey.class),
Writables.pairs(Writables.writables(SortKey.class),
Writables.writables(Info.class)));

// note that dataset has been explicitly sharded by numPartitions
PTable< GroupByKey, Pair< SortKey, Info >> infos = dataset.parallelDo(...,
pType);

PTable< SortKey, Info > mergedInfos =
SecondarySort.sortAndApply(infos, mergeInfos(...),
mergeType, numPartitions);

static class GroupByKey implements Writable {

public int treeId;
public int nodeId;
...
}

I can confirm that records come in sorted and grouped but I am also
observing that a single group may be executed on at different nodes. More
concretely lets say group belonging to treeId=0, nodeId=0 has 100 records,
the first 30 may show up on node1, and the remaining on node2 (in both
cases sorted). Informally it does look like it basically ensures that each
node is scheduled to process the same number of records. It's especially
evident with 2 partition where exactly one group is split.

The semantics of the code (at least for now) require all the values to come
in with a single group. Can that be forced?

env: spark 1.5 and crunch 0.11.0

Any thoughts would be appreciated!

Re: Secondary sort and partitioning in Spark

Posted by Andrey Gusev <an...@siftscience.com>.
Yeah, so that appears to be the symptom. I did try both 0.13 and just built
0.14 from source - but the problem is present in both cases.

When I use 2 partitions - and say 100 total groups, I see the first
partition starting at group 0 and going through groups to group 47, sort
field index 344

roughly the log line

groupId:47, sortFieldIndex: 344


the next partition executing on different node starts at:

groupId: 47, sortFieldIndex: 345


So group is broken up

I do see that PartitionedMapOutputFunction is used in PGroupedTableImpl
(spark) so it looks like CRUNCH-556
<https://issues.apache.org/jira/browse/CRUNCH-556> isn't sufficient.





On Wed, Dec 9, 2015 at 8:56 PM, Josh Wills <jo...@gmail.com> wrote:

> Hrm-- so you're saying records for the same GroupByKey are ending up in
> different partitions when you're doing a secondary sort? Sounds like a bug
> in the SparkPartitioner we're using-- I wonder if it was the same bug that
> was fixed here?
>
> https://issues.apache.org/jira/browse/CRUNCH-556
>
> On Wed, Dec 9, 2015 at 6:05 PM, Andrey Gusev <an...@siftscience.com>
> wrote:
>
>> Hello crunch!
>>
>> I am running into problems with partitioning of groups with secondary
>> sort running on SparkPipeline.
>>
>> What I am observing is that records belonging to a single group may be
>> split across two or more calls to apply DoFn. This could be a gap in my
>> understanding of Spark execution model wrt to locality - and if so, can
>> *all* the records belonging to a groupBy key be forced to a single call?
>>
>> Roughly speaking the code looks like this:
>>
>> PTableType<GroupByKey, Pair<SortKey, Info>> pType =
>> tableOf(Writables.writables(GroupByKey.class),
>> Writables.pairs(Writables.writables(SortKey.class),
>> Writables.writables(Info.class)));
>>
>> // note that dataset has been explicitly sharded by numPartitions
>> PTable< GroupByKey, Pair< SortKey, Info >> infos =
>> dataset.parallelDo(..., pType);
>>
>> PTable< SortKey, Info > mergedInfos =
>> SecondarySort.sortAndApply(infos, mergeInfos(...),
>> mergeType, numPartitions);
>>
>> static class GroupByKey implements Writable {
>>
>> public int treeId;
>> public int nodeId;
>> ...
>> }
>>
>> I can confirm that records come in sorted and grouped but I am also
>> observing that a single group may be executed on at different nodes. More
>> concretely lets say group belonging to treeId=0, nodeId=0 has 100 records,
>> the first 30 may show up on node1, and the remaining on node2 (in both
>> cases sorted). Informally it does look like it basically ensures that each
>> node is scheduled to process the same number of records. It's especially
>> evident with 2 partition where exactly one group is split.
>>
>> The semantics of the code (at least for now) require all the values to
>> come in with a single group. Can that be forced?
>>
>> env: spark 1.5 and crunch 0.11.0
>>
>> Any thoughts would be appreciated!
>>
>
>

Re: Secondary sort and partitioning in Spark

Posted by Josh Wills <jo...@gmail.com>.
Hrm-- so you're saying records for the same GroupByKey are ending up in
different partitions when you're doing a secondary sort? Sounds like a bug
in the SparkPartitioner we're using-- I wonder if it was the same bug that
was fixed here?

https://issues.apache.org/jira/browse/CRUNCH-556

On Wed, Dec 9, 2015 at 6:05 PM, Andrey Gusev <an...@siftscience.com> wrote:

> Hello crunch!
>
> I am running into problems with partitioning of groups with secondary sort
> running on SparkPipeline.
>
> What I am observing is that records belonging to a single group may be
> split across two or more calls to apply DoFn. This could be a gap in my
> understanding of Spark execution model wrt to locality - and if so, can
> *all* the records belonging to a groupBy key be forced to a single call?
>
> Roughly speaking the code looks like this:
>
> PTableType<GroupByKey, Pair<SortKey, Info>> pType =
> tableOf(Writables.writables(GroupByKey.class),
> Writables.pairs(Writables.writables(SortKey.class),
> Writables.writables(Info.class)));
>
> // note that dataset has been explicitly sharded by numPartitions
> PTable< GroupByKey, Pair< SortKey, Info >> infos = dataset.parallelDo(...,
> pType);
>
> PTable< SortKey, Info > mergedInfos =
> SecondarySort.sortAndApply(infos, mergeInfos(...),
> mergeType, numPartitions);
>
> static class GroupByKey implements Writable {
>
> public int treeId;
> public int nodeId;
> ...
> }
>
> I can confirm that records come in sorted and grouped but I am also
> observing that a single group may be executed on at different nodes. More
> concretely lets say group belonging to treeId=0, nodeId=0 has 100 records,
> the first 30 may show up on node1, and the remaining on node2 (in both
> cases sorted). Informally it does look like it basically ensures that each
> node is scheduled to process the same number of records. It's especially
> evident with 2 partition where exactly one group is split.
>
> The semantics of the code (at least for now) require all the values to
> come in with a single group. Can that be forced?
>
> env: spark 1.5 and crunch 0.11.0
>
> Any thoughts would be appreciated!
>