You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Steve Lewis <lo...@gmail.com> on 2014/11/25 19:06:50 UTC

Why is this operation so expensive

I have an JavaPairRDD<KeyType,Tuple2<Type1,Type2>> originalPairs. There are
on the order of 100 million elements

I call a function to rearrange the tuples
  JavaPairRDD<String,Tuple2<Type1,Type2>>   newPairs =
originalPairs.values().mapToPair(new PairFunction<Tuple2<Type1,Type2>,
String, Tuple2<IType1,Type2>> {
        @Override
        public Tuple2<String, Tuple2<Type1,Type2>> doCall(final
Tuple2<Type1,Type2> t)  {
            return new Tuple2<String, Tuple2<Type1,Type2>>(t._1().getId(),
t);
        }
    }

where Type1.getId() returns a String

The data are spread across 120 partitions on 15 machines. The operation is
dead simple and yet it takes 5 minutes to generate the data and over 30
minutes to perform this simple operation. I am at a loss to  understand
what is taking so long or how to make it faster. It this stage there is no
reason to move data to different partitions
Anyone have bright ideas - Oh yes Type1 and Type2 are moderately complex
objects weighing in at about 10kb

Re: Why is this operation so expensive

Posted by Steve Lewis <lo...@gmail.com>.
If I combineByKey in the next step I suppose I am paying for a shuffle I
need any way - right?
Also if I supply a custom partitioner rather than hash can I control where
and how data is shuffled - overriding equals and hashcode could be a bad
thing but a custom partitioner is less dangerous

On Tue, Nov 25, 2014 at 1:55 PM, Andrew Ash <an...@andrewash.com> wrote:

> Hi Steve,
>
> You changed the first value in a Tuple2, which is the one that Spark uses
> to hash and determine where in the cluster to place the value.  By changing
> the first part of the PairRDD, you've implicitly asked Spark to reshuffle
> the data according to the new keys.  I'd guess that you would observe large
> amounts of shuffle in the webui as a result of this code.
>
> If you don't actually need your data shuffled by the first part of the
> pair RDD, then consider making the KeyType not in the first half of the
> PairRDD.  An alternative is to make the .equals() and .hashcode() of
> KeyType delegate to the .getId() method you use in the anonymous function.
>
> Cheers,
> Andrew
>
> On Tue, Nov 25, 2014 at 10:06 AM, Steve Lewis <lo...@gmail.com>
> wrote:
>
>> I have an JavaPairRDD<KeyType,Tuple2<Type1,Type2>> originalPairs. There
>> are on the order of 100 million elements
>>
>> I call a function to rearrange the tuples
>>   JavaPairRDD<String,Tuple2<Type1,Type2>>   newPairs =
>> originalPairs.values().mapToPair(new PairFunction<Tuple2<Type1,Type2>,
>> String, Tuple2<IType1,Type2>> {
>>         @Override
>>         public Tuple2<String, Tuple2<Type1,Type2>> doCall(final
>> Tuple2<Type1,Type2> t)  {
>>             return new Tuple2<String,
>> Tuple2<Type1,Type2>>(t._1().getId(), t);
>>         }
>>     }
>>
>> where Type1.getId() returns a String
>>
>> The data are spread across 120 partitions on 15 machines. The operation
>> is dead simple and yet it takes 5 minutes to generate the data and over 30
>> minutes to perform this simple operation. I am at a loss to  understand
>> what is taking so long or how to make it faster. It this stage there is no
>> reason to move data to different partitions
>> Anyone have bright ideas - Oh yes Type1 and Type2 are moderately complex
>> objects weighing in at about 10kb
>>
>>
>


-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com

Re: Why is this operation so expensive

Posted by Andrew Ash <an...@andrewash.com>.
Hi Steve,

You changed the first value in a Tuple2, which is the one that Spark uses
to hash and determine where in the cluster to place the value.  By changing
the first part of the PairRDD, you've implicitly asked Spark to reshuffle
the data according to the new keys.  I'd guess that you would observe large
amounts of shuffle in the webui as a result of this code.

If you don't actually need your data shuffled by the first part of the pair
RDD, then consider making the KeyType not in the first half of the
PairRDD.  An alternative is to make the .equals() and .hashcode() of
KeyType delegate to the .getId() method you use in the anonymous function.

Cheers,
Andrew

On Tue, Nov 25, 2014 at 10:06 AM, Steve Lewis <lo...@gmail.com> wrote:

> I have an JavaPairRDD<KeyType,Tuple2<Type1,Type2>> originalPairs. There
> are on the order of 100 million elements
>
> I call a function to rearrange the tuples
>   JavaPairRDD<String,Tuple2<Type1,Type2>>   newPairs =
> originalPairs.values().mapToPair(new PairFunction<Tuple2<Type1,Type2>,
> String, Tuple2<IType1,Type2>> {
>         @Override
>         public Tuple2<String, Tuple2<Type1,Type2>> doCall(final
> Tuple2<Type1,Type2> t)  {
>             return new Tuple2<String, Tuple2<Type1,Type2>>(t._1().getId(),
> t);
>         }
>     }
>
> where Type1.getId() returns a String
>
> The data are spread across 120 partitions on 15 machines. The operation is
> dead simple and yet it takes 5 minutes to generate the data and over 30
> minutes to perform this simple operation. I am at a loss to  understand
> what is taking so long or how to make it faster. It this stage there is no
> reason to move data to different partitions
> Anyone have bright ideas - Oh yes Type1 and Type2 are moderately complex
> objects weighing in at about 10kb
>
>