You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Matt Cheah <mc...@palantir.com> on 2015/02/19 01:47:11 UTC
[Performance] Possible regression in rdd.take()?
Hi everyone,
Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
consistently has a slower execution time on the later release. I was
wondering if anyone else has had similar observations.
I have two setups where this reproduces. The first is a local test. I
launched a spark cluster with 4 worker JVMs on my Mac, and launched a
Spark-Shell. I retrieved the text file and immediately called rdd.take(N) on
it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over 8
files, which ends up having 128 partitions, and a total of 80000000 rows.
The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with all
numbers being in seconds:
10000 items
Spark 1.0.2: 0.069281, 0.012261, 0.011083
Spark 1.1.1: 0.11577, 0.097636, 0.11321
40000 items
Spark 1.0.2: 0.023751, 0.069365, 0.023603
Spark 1.1.1: 0.224287, 0.229651, 0.158431
100000 items
Spark 1.0.2: 0.047019, 0.049056, 0.042568
Spark 1.1.1: 0.353277, 0.288965, 0.281751
400000 items
Spark 1.0.2: 0.216048, 0.198049, 0.796037
Spark 1.1.1: 1.865622, 2.224424, 2.037672
This small test suite indicates a consistently reproducible performance
regression.
I also notice this on a larger scale test. The cluster used is on EC2:
ec2 instance type: m2.4xlarge
10 slaves, 1 master
ephemeral storage
70 cores, 50 GB/box
In this case, I have a 100GB dataset split into 78 files totally 350 million
items, and I take the first 50,000 items from the RDD. In this case, I have
tested this on different formats of the raw data.
With plaintext files:
Spark 1.0.2: 0.422s, 0.363s, 0.382s
Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s
With snappy-compressed Avro files:
Spark 1.0.2: 0.73s, 0.395s, 0.426s
Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s
Again demonstrating a reproducible performance regression.
I was wondering if anyone else observed this regression, and if so, if
anyone would have any idea what could possibly have caused it between Spark
1.0.2 and Spark 1.1.1?
Thanks,
-Matt Cheah
Re: [Performance] Possible regression in rdd.take()?
Posted by Matt Cheah <mc...@palantir.com>.
Ah okay, I turned on spark.localExecution.enabled and the performance
returned to what Spark 1.0.2 had. However I can see how users can
inadvertently incur memory and network strain in fetching the whole
partition to the driver.
I¹ll evaluate on my side if we want to turn this on or not. Thanks for the
quick and accurate response!
-Matt CHeah
From: Aaron Davidson <il...@gmail.com>
Date: Wednesday, February 18, 2015 at 5:25 PM
To: Matt Cheah <mc...@palantir.com>
Cc: Patrick Wendell <pw...@gmail.com>, "dev@spark.apache.org"
<de...@spark.apache.org>, Mingyu Kim <mk...@palantir.com>, Sandor Van
Wassenhove <sa...@palantir.com>
Subject: Re: [Performance] Possible regression in rdd.take()?
You might be seeing the result of this patch:
https://github.com/apache/spark/commit/d069c5d9d2f6ce06389ca2ddf0b3ae4db72c5
797
which was introduced in 1.1.1. This patch disabled the ability for take() to
run without launching a Spark job, which means that the latency is
significantly increased for small jobs (but not for large ones). You can try
enabling local execution and seeing if your problem goes away.
On Wed, Feb 18, 2015 at 5:10 PM, Matt Cheah <mc...@palantir.com> wrote:
> I actually tested Spark 1.2.0 with the code in the rdd.take() method
> swapped out for what was in Spark 1.0.2. The run time was still slower,
> which indicates to me something at work lower in the stack.
>
> -Matt Cheah
>
> On 2/18/15, 4:54 PM, "Patrick Wendell" <pw...@gmail.com> wrote:
>
>> >I believe the heuristic governing the way that take() decides to fetch
>> >partitions changed between these versions. It could be that in certain
>> >cases the new heuristic is worse, but it might be good to just look at
>> >the source code and see, for your number of elements taken and number
>> >of partitions, if there was any effective change in how aggressively
>> >spark fetched partitions.
>> >
>> >This was quite a while ago, but I think the change was made because in
>> >many cases the newer code works more efficiently.
>> >
>> >- Patrick
>> >
>> >On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah <mc...@palantir.com> wrote:
>>> >> Hi everyone,
>>> >>
>>> >> Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
>>> >> consistently has a slower execution time on the later release. I was
>>> >> wondering if anyone else has had similar observations.
>>> >>
>>> >> I have two setups where this reproduces. The first is a local test. I
>>> >> launched a spark cluster with 4 worker JVMs on my Mac, and launched a
>>> >> Spark-Shell. I retrieved the text file and immediately called
>>> >>rdd.take(N) on
>>> >> it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over
>>> >>8
>>> >> files, which ends up having 128 partitions, and a total of 80000000
>>> >>rows.
>>> >> The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with
>>> >>all
>>> >> numbers being in seconds:
>>> >>
>>> >> 10000 items
>>> >>
>>> >> Spark 1.0.2: 0.069281, 0.012261, 0.011083
>>> >>
>>> >> Spark 1.1.1: 0.11577, 0.097636, 0.11321
>>> >>
>>> >>
>>> >> 40000 items
>>> >>
>>> >> Spark 1.0.2: 0.023751, 0.069365, 0.023603
>>> >>
>>> >> Spark 1.1.1: 0.224287, 0.229651, 0.158431
>>> >>
>>> >>
>>> >> 100000 items
>>> >>
>>> >> Spark 1.0.2: 0.047019, 0.049056, 0.042568
>>> >>
>>> >> Spark 1.1.1: 0.353277, 0.288965, 0.281751
>>> >>
>>> >>
>>> >> 400000 items
>>> >>
>>> >> Spark 1.0.2: 0.216048, 0.198049, 0.796037
>>> >>
>>> >> Spark 1.1.1: 1.865622, 2.224424, 2.037672
>>> >>
>>> >> This small test suite indicates a consistently reproducible performance
>>> >> regression.
>>> >>
>>> >>
>>> >> I also notice this on a larger scale test. The cluster used is on EC2:
>>> >>
>>> >> ec2 instance type: m2.4xlarge
>>> >> 10 slaves, 1 master
>>> >> ephemeral storage
>>> >> 70 cores, 50 GB/box
>>> >>
>>> >> In this case, I have a 100GB dataset split into 78 files totally 350
>>> >>million
>>> >> items, and I take the first 50,000 items from the RDD. In this case, I
>>> >>have
>>> >> tested this on different formats of the raw data.
>>> >>
>>> >> With plaintext files:
>>> >>
>>> >> Spark 1.0.2: 0.422s, 0.363s, 0.382s
>>> >>
>>> >> Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s
>>> >>
>>> >>
>>> >> With snappy-compressed Avro files:
>>> >>
>>> >> Spark 1.0.2: 0.73s, 0.395s, 0.426s
>>> >>
>>> >> Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s
>>> >>
>>> >> Again demonstrating a reproducible performance regression.
>>> >>
>>> >> I was wondering if anyone else observed this regression, and if so, if
>>> >> anyone would have any idea what could possibly have caused it between
>>> >>Spark
>>> >> 1.0.2 and Spark 1.1.1?
>>> >>
>>> >> Thanks,
>>> >>
>>> >> -Matt Cheah
Re: [Performance] Possible regression in rdd.take()?
Posted by Aaron Davidson <il...@gmail.com>.
You might be seeing the result of this patch:
https://github.com/apache/spark/commit/d069c5d9d2f6ce06389ca2ddf0b3ae4db72c5797
which was introduced in 1.1.1. This patch disabled the ability for take()
to run without launching a Spark job, which means that the latency is
significantly increased for small jobs (but not for large ones). You can
try enabling local execution and seeing if your problem goes away.
On Wed, Feb 18, 2015 at 5:10 PM, Matt Cheah <mc...@palantir.com> wrote:
> I actually tested Spark 1.2.0 with the code in the rdd.take() method
> swapped out for what was in Spark 1.0.2. The run time was still slower,
> which indicates to me something at work lower in the stack.
>
> -Matt Cheah
>
> On 2/18/15, 4:54 PM, "Patrick Wendell" <pw...@gmail.com> wrote:
>
> >I believe the heuristic governing the way that take() decides to fetch
> >partitions changed between these versions. It could be that in certain
> >cases the new heuristic is worse, but it might be good to just look at
> >the source code and see, for your number of elements taken and number
> >of partitions, if there was any effective change in how aggressively
> >spark fetched partitions.
> >
> >This was quite a while ago, but I think the change was made because in
> >many cases the newer code works more efficiently.
> >
> >- Patrick
> >
> >On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah <mc...@palantir.com> wrote:
> >> Hi everyone,
> >>
> >> Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
> >> consistently has a slower execution time on the later release. I was
> >> wondering if anyone else has had similar observations.
> >>
> >> I have two setups where this reproduces. The first is a local test. I
> >> launched a spark cluster with 4 worker JVMs on my Mac, and launched a
> >> Spark-Shell. I retrieved the text file and immediately called
> >>rdd.take(N) on
> >> it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over
> >>8
> >> files, which ends up having 128 partitions, and a total of 80000000
> >>rows.
> >> The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with
> >>all
> >> numbers being in seconds:
> >>
> >> 10000 items
> >>
> >> Spark 1.0.2: 0.069281, 0.012261, 0.011083
> >>
> >> Spark 1.1.1: 0.11577, 0.097636, 0.11321
> >>
> >>
> >> 40000 items
> >>
> >> Spark 1.0.2: 0.023751, 0.069365, 0.023603
> >>
> >> Spark 1.1.1: 0.224287, 0.229651, 0.158431
> >>
> >>
> >> 100000 items
> >>
> >> Spark 1.0.2: 0.047019, 0.049056, 0.042568
> >>
> >> Spark 1.1.1: 0.353277, 0.288965, 0.281751
> >>
> >>
> >> 400000 items
> >>
> >> Spark 1.0.2: 0.216048, 0.198049, 0.796037
> >>
> >> Spark 1.1.1: 1.865622, 2.224424, 2.037672
> >>
> >> This small test suite indicates a consistently reproducible performance
> >> regression.
> >>
> >>
> >> I also notice this on a larger scale test. The cluster used is on EC2:
> >>
> >> ec2 instance type: m2.4xlarge
> >> 10 slaves, 1 master
> >> ephemeral storage
> >> 70 cores, 50 GB/box
> >>
> >> In this case, I have a 100GB dataset split into 78 files totally 350
> >>million
> >> items, and I take the first 50,000 items from the RDD. In this case, I
> >>have
> >> tested this on different formats of the raw data.
> >>
> >> With plaintext files:
> >>
> >> Spark 1.0.2: 0.422s, 0.363s, 0.382s
> >>
> >> Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s
> >>
> >>
> >> With snappy-compressed Avro files:
> >>
> >> Spark 1.0.2: 0.73s, 0.395s, 0.426s
> >>
> >> Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s
> >>
> >> Again demonstrating a reproducible performance regression.
> >>
> >> I was wondering if anyone else observed this regression, and if so, if
> >> anyone would have any idea what could possibly have caused it between
> >>Spark
> >> 1.0.2 and Spark 1.1.1?
> >>
> >> Thanks,
> >>
> >> -Matt Cheah
>
Re: [Performance] Possible regression in rdd.take()?
Posted by Matt Cheah <mc...@palantir.com>.
I actually tested Spark 1.2.0 with the code in the rdd.take() method
swapped out for what was in Spark 1.0.2. The run time was still slower,
which indicates to me something at work lower in the stack.
-Matt Cheah
On 2/18/15, 4:54 PM, "Patrick Wendell" <pw...@gmail.com> wrote:
>I believe the heuristic governing the way that take() decides to fetch
>partitions changed between these versions. It could be that in certain
>cases the new heuristic is worse, but it might be good to just look at
>the source code and see, for your number of elements taken and number
>of partitions, if there was any effective change in how aggressively
>spark fetched partitions.
>
>This was quite a while ago, but I think the change was made because in
>many cases the newer code works more efficiently.
>
>- Patrick
>
>On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah <mc...@palantir.com> wrote:
>> Hi everyone,
>>
>> Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
>> consistently has a slower execution time on the later release. I was
>> wondering if anyone else has had similar observations.
>>
>> I have two setups where this reproduces. The first is a local test. I
>> launched a spark cluster with 4 worker JVMs on my Mac, and launched a
>> Spark-Shell. I retrieved the text file and immediately called
>>rdd.take(N) on
>> it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over
>>8
>> files, which ends up having 128 partitions, and a total of 80000000
>>rows.
>> The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with
>>all
>> numbers being in seconds:
>>
>> 10000 items
>>
>> Spark 1.0.2: 0.069281, 0.012261, 0.011083
>>
>> Spark 1.1.1: 0.11577, 0.097636, 0.11321
>>
>>
>> 40000 items
>>
>> Spark 1.0.2: 0.023751, 0.069365, 0.023603
>>
>> Spark 1.1.1: 0.224287, 0.229651, 0.158431
>>
>>
>> 100000 items
>>
>> Spark 1.0.2: 0.047019, 0.049056, 0.042568
>>
>> Spark 1.1.1: 0.353277, 0.288965, 0.281751
>>
>>
>> 400000 items
>>
>> Spark 1.0.2: 0.216048, 0.198049, 0.796037
>>
>> Spark 1.1.1: 1.865622, 2.224424, 2.037672
>>
>> This small test suite indicates a consistently reproducible performance
>> regression.
>>
>>
>> I also notice this on a larger scale test. The cluster used is on EC2:
>>
>> ec2 instance type: m2.4xlarge
>> 10 slaves, 1 master
>> ephemeral storage
>> 70 cores, 50 GB/box
>>
>> In this case, I have a 100GB dataset split into 78 files totally 350
>>million
>> items, and I take the first 50,000 items from the RDD. In this case, I
>>have
>> tested this on different formats of the raw data.
>>
>> With plaintext files:
>>
>> Spark 1.0.2: 0.422s, 0.363s, 0.382s
>>
>> Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s
>>
>>
>> With snappy-compressed Avro files:
>>
>> Spark 1.0.2: 0.73s, 0.395s, 0.426s
>>
>> Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s
>>
>> Again demonstrating a reproducible performance regression.
>>
>> I was wondering if anyone else observed this regression, and if so, if
>> anyone would have any idea what could possibly have caused it between
>>Spark
>> 1.0.2 and Spark 1.1.1?
>>
>> Thanks,
>>
>> -Matt Cheah
Re: [Performance] Possible regression in rdd.take()?
Posted by Patrick Wendell <pw...@gmail.com>.
I believe the heuristic governing the way that take() decides to fetch
partitions changed between these versions. It could be that in certain
cases the new heuristic is worse, but it might be good to just look at
the source code and see, for your number of elements taken and number
of partitions, if there was any effective change in how aggressively
spark fetched partitions.
This was quite a while ago, but I think the change was made because in
many cases the newer code works more efficiently.
- Patrick
On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah <mc...@palantir.com> wrote:
> Hi everyone,
>
> Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
> consistently has a slower execution time on the later release. I was
> wondering if anyone else has had similar observations.
>
> I have two setups where this reproduces. The first is a local test. I
> launched a spark cluster with 4 worker JVMs on my Mac, and launched a
> Spark-Shell. I retrieved the text file and immediately called rdd.take(N) on
> it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over 8
> files, which ends up having 128 partitions, and a total of 80000000 rows.
> The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with all
> numbers being in seconds:
>
> 10000 items
>
> Spark 1.0.2: 0.069281, 0.012261, 0.011083
>
> Spark 1.1.1: 0.11577, 0.097636, 0.11321
>
>
> 40000 items
>
> Spark 1.0.2: 0.023751, 0.069365, 0.023603
>
> Spark 1.1.1: 0.224287, 0.229651, 0.158431
>
>
> 100000 items
>
> Spark 1.0.2: 0.047019, 0.049056, 0.042568
>
> Spark 1.1.1: 0.353277, 0.288965, 0.281751
>
>
> 400000 items
>
> Spark 1.0.2: 0.216048, 0.198049, 0.796037
>
> Spark 1.1.1: 1.865622, 2.224424, 2.037672
>
> This small test suite indicates a consistently reproducible performance
> regression.
>
>
> I also notice this on a larger scale test. The cluster used is on EC2:
>
> ec2 instance type: m2.4xlarge
> 10 slaves, 1 master
> ephemeral storage
> 70 cores, 50 GB/box
>
> In this case, I have a 100GB dataset split into 78 files totally 350 million
> items, and I take the first 50,000 items from the RDD. In this case, I have
> tested this on different formats of the raw data.
>
> With plaintext files:
>
> Spark 1.0.2: 0.422s, 0.363s, 0.382s
>
> Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s
>
>
> With snappy-compressed Avro files:
>
> Spark 1.0.2: 0.73s, 0.395s, 0.426s
>
> Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s
>
> Again demonstrating a reproducible performance regression.
>
> I was wondering if anyone else observed this regression, and if so, if
> anyone would have any idea what could possibly have caused it between Spark
> 1.0.2 and Spark 1.1.1?
>
> Thanks,
>
> -Matt Cheah
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org