You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nitay Joffe <ni...@actioniq.co> on 2014/11/20 17:54:54 UTC

Spark S3 Performance

I have a simple S3 job to read a text file and do a line count.
Specifically I'm doing *sc.textFile("s3n://mybucket/myfile").count*.The
file is about 1.2GB. My setup is standalone spark cluster with 4 workers
each with 2 cores / 16GB ram. I'm using branch-1.2 code built against
hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).

The whole count is taking on the order of a couple of minutes, which seems
extremely slow.
I've been looking into it and so far have noticed two things, hoping the
community has seen this before and knows what to do...

1) Every executor seems to make an S3 call to read the *entire file* before
making another call to read just it's split. Here's a paste I've cleaned up
to show just one task: http://goo.gl/XCfyZA. I've verified this happens in
every task. It is taking a long time (40-50 seconds), I don't see why it is
doing this?
2) I've tried a few numPartitions parameters. When I make the parameter
anything below 21 it seems to get ignored. Under the hood FileInputFormat
is doing something that always ends up with at least 21 partitions of ~64MB
or so. I've also tried 40, 60, and 100 partitions and have seen that the
performance only gets worse as I increase it beyond 21. I would like to try
8 just to see, but again I don't see how to force it to go below 21.

Thanks for the help,
- Nitay
Founder & CTO

Re: Spark S3 Performance

Posted by Andrei <fa...@gmail.com>.
Not that I'm professional user of Amazon services, but I have a guess about
your performance issues. From [1], there are two different filesystems over
S3:

 - native that behaves just like regular files (schema: s3n)
 - block-based that looks more like HDFS (schema: s3)

Since you use "s3n" in your URL, each Spark worker seems to treat the file
as unsplittable piece of data and downloads it all (though, probably,
applies functions to specific regions only). If I understand it right,
using "s3" instead will allow Spark workers see data as a sequence of
blocks and download each block separately.

But anyway, using s3 Implies loss of data locality, so data will be
transferred to workers instead of code being transferred to data. Given
data size of 1.2Gb, consider also storing data in Hadoop's HDFS instead of
S3 (as far as I remember, Amazon allows using both at the same time).

Please, let us know if it works.


[1]: https://wiki.apache.org/hadoop/AmazonS3

On Sat, Nov 22, 2014 at 6:21 PM, Nitay Joffe <ni...@actioniq.co> wrote:

> Err I meant #1 :)
>
> - Nitay
> Founder & CTO
>
>
> On Sat, Nov 22, 2014 at 10:20 AM, Nitay Joffe <ni...@actioniq.co> wrote:
>
>> Anyone have any thoughts on this? Trying to understand especially #2 if
>> it's a legit bug or something I'm doing wrong.
>>
>> - Nitay
>> Founder & CTO
>>
>>
>> On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe <ni...@actioniq.co> wrote:
>>
>>> I have a simple S3 job to read a text file and do a line count.
>>> Specifically I'm doing *sc.textFile("s3n://mybucket/myfile").count*.The
>>> file is about 1.2GB. My setup is standalone spark cluster with 4 workers
>>> each with 2 cores / 16GB ram. I'm using branch-1.2 code built against
>>> hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).
>>>
>>> The whole count is taking on the order of a couple of minutes, which
>>> seems extremely slow.
>>> I've been looking into it and so far have noticed two things, hoping the
>>> community has seen this before and knows what to do...
>>>
>>> 1) Every executor seems to make an S3 call to read the *entire file* before
>>> making another call to read just it's split. Here's a paste I've cleaned up
>>> to show just one task: http://goo.gl/XCfyZA. I've verified this happens
>>> in every task. It is taking a long time (40-50 seconds), I don't see why it
>>> is doing this?
>>> 2) I've tried a few numPartitions parameters. When I make the parameter
>>> anything below 21 it seems to get ignored. Under the hood FileInputFormat
>>> is doing something that always ends up with at least 21 partitions of ~64MB
>>> or so. I've also tried 40, 60, and 100 partitions and have seen that the
>>> performance only gets worse as I increase it beyond 21. I would like to try
>>> 8 just to see, but again I don't see how to force it to go below 21.
>>>
>>> Thanks for the help,
>>> - Nitay
>>> Founder & CTO
>>>
>>>
>>
>

Re: Spark S3 Performance

Posted by Daniil Osipov <da...@shazam.com>.
Can you verify that its reading the entire file on each worker using
network monitoring stats? If it does, that would be a bug in my opinion.

On Mon, Nov 24, 2014 at 2:06 PM, Nitay Joffe <ni...@actioniq.co> wrote:

> Andrei, Ashish,
>
> To be clear, I don't think it's *counting* the entire file. It just seems
> from the logging and the timing that it is doing a get of the entire file,
> then figures out it only needs some certain blocks, does another get of
> only the specific block.
>
> Regarding # partitions - I think I see now it has to do with Hadoop's
> block size being set at 64MB. This is not a big deal to me, the main issue
> is the first one, why is every worker doing a call to get the entire file
> followed by the *real* call to get only the specific partitions it needs.
>
> Best,
>
> - Nitay
> Founder & CTO
>
>
> On Sat, Nov 22, 2014 at 8:28 PM, Andrei <fa...@gmail.com> wrote:
>
>> Concerning your second question, I believe you try to set number of
>> partitions with something like this:
>>
>>     rdd = sc.textFile(..., 8)
>>
>> but things like `textFile()` don't actually take fixed number of
>> partitions. Instead, they expect *minimal* number of partitions. Since
>> in your file you have 21 blocks of data, it creates exactly 21 worker
>> (which is greater than 8, as expected). To set exact number of partitions,
>> use `repartition()` or its full version - `coalesce()` (see example [1])
>>
>> [1]:
>> http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#coalesce
>>
>>
>>
>> On Sat, Nov 22, 2014 at 10:04 PM, Ashish Rangole <ar...@gmail.com>
>> wrote:
>>
>>> What makes you think that each executor is reading the whole file? If
>>> that is the case then the count value returned to the driver will be actual
>>> X NumOfExecutors. Is that the case when compared with actual lines in the
>>> input file? If the count returned is same as actual then you probably don't
>>> have an extra read problem.
>>>
>>> I also see this in your logs which indicates that it is a read that
>>> starts from an offset and reading one split size (64MB) worth of data:
>>>
>>> 14/11/20 15:39:45 [Executor task launch worker-1 ] INFO HadoopRDD: Input
>>> split: s3n://mybucket/myfile:335544320+67108864
>>> On Nov 22, 2014 7:23 AM, "Nitay Joffe" <ni...@actioniq.co> wrote:
>>>
>>>> Err I meant #1 :)
>>>>
>>>> - Nitay
>>>> Founder & CTO
>>>>
>>>>
>>>> On Sat, Nov 22, 2014 at 10:20 AM, Nitay Joffe <ni...@actioniq.co>
>>>> wrote:
>>>>
>>>>> Anyone have any thoughts on this? Trying to understand especially #2
>>>>> if it's a legit bug or something I'm doing wrong.
>>>>>
>>>>> - Nitay
>>>>> Founder & CTO
>>>>>
>>>>>
>>>>> On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe <ni...@actioniq.co>
>>>>> wrote:
>>>>>
>>>>>> I have a simple S3 job to read a text file and do a line count.
>>>>>> Specifically I'm doing *sc.textFile("s3n://mybucket/myfile").count*.The
>>>>>> file is about 1.2GB. My setup is standalone spark cluster with 4 workers
>>>>>> each with 2 cores / 16GB ram. I'm using branch-1.2 code built against
>>>>>> hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).
>>>>>>
>>>>>> The whole count is taking on the order of a couple of minutes, which
>>>>>> seems extremely slow.
>>>>>> I've been looking into it and so far have noticed two things, hoping
>>>>>> the community has seen this before and knows what to do...
>>>>>>
>>>>>> 1) Every executor seems to make an S3 call to read the *entire file* before
>>>>>> making another call to read just it's split. Here's a paste I've cleaned up
>>>>>> to show just one task: http://goo.gl/XCfyZA. I've verified this
>>>>>> happens in every task. It is taking a long time (40-50 seconds), I don't
>>>>>> see why it is doing this?
>>>>>> 2) I've tried a few numPartitions parameters. When I make the
>>>>>> parameter anything below 21 it seems to get ignored. Under the hood
>>>>>> FileInputFormat is doing something that always ends up with at least 21
>>>>>> partitions of ~64MB or so. I've also tried 40, 60, and 100 partitions and
>>>>>> have seen that the performance only gets worse as I increase it beyond 21.
>>>>>> I would like to try 8 just to see, but again I don't see how to force it to
>>>>>> go below 21.
>>>>>>
>>>>>> Thanks for the help,
>>>>>> - Nitay
>>>>>> Founder & CTO
>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Re: Spark S3 Performance

Posted by Nitay Joffe <ni...@actioniq.co>.
Andrei, Ashish,

To be clear, I don't think it's *counting* the entire file. It just seems
from the logging and the timing that it is doing a get of the entire file,
then figures out it only needs some certain blocks, does another get of
only the specific block.

Regarding # partitions - I think I see now it has to do with Hadoop's block
size being set at 64MB. This is not a big deal to me, the main issue is the
first one, why is every worker doing a call to get the entire file followed
by the *real* call to get only the specific partitions it needs.

Best,

- Nitay
Founder & CTO


On Sat, Nov 22, 2014 at 8:28 PM, Andrei <fa...@gmail.com> wrote:

> Concerning your second question, I believe you try to set number of
> partitions with something like this:
>
>     rdd = sc.textFile(..., 8)
>
> but things like `textFile()` don't actually take fixed number of
> partitions. Instead, they expect *minimal* number of partitions. Since in
> your file you have 21 blocks of data, it creates exactly 21 worker (which
> is greater than 8, as expected). To set exact number of partitions, use
> `repartition()` or its full version - `coalesce()` (see example [1])
>
> [1]:
> http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#coalesce
>
>
>
> On Sat, Nov 22, 2014 at 10:04 PM, Ashish Rangole <ar...@gmail.com>
> wrote:
>
>> What makes you think that each executor is reading the whole file? If
>> that is the case then the count value returned to the driver will be actual
>> X NumOfExecutors. Is that the case when compared with actual lines in the
>> input file? If the count returned is same as actual then you probably don't
>> have an extra read problem.
>>
>> I also see this in your logs which indicates that it is a read that
>> starts from an offset and reading one split size (64MB) worth of data:
>>
>> 14/11/20 15:39:45 [Executor task launch worker-1 ] INFO HadoopRDD: Input
>> split: s3n://mybucket/myfile:335544320+67108864
>> On Nov 22, 2014 7:23 AM, "Nitay Joffe" <ni...@actioniq.co> wrote:
>>
>>> Err I meant #1 :)
>>>
>>> - Nitay
>>> Founder & CTO
>>>
>>>
>>> On Sat, Nov 22, 2014 at 10:20 AM, Nitay Joffe <ni...@actioniq.co> wrote:
>>>
>>>> Anyone have any thoughts on this? Trying to understand especially #2 if
>>>> it's a legit bug or something I'm doing wrong.
>>>>
>>>> - Nitay
>>>> Founder & CTO
>>>>
>>>>
>>>> On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe <ni...@actioniq.co>
>>>> wrote:
>>>>
>>>>> I have a simple S3 job to read a text file and do a line count.
>>>>> Specifically I'm doing *sc.textFile("s3n://mybucket/myfile").count*.The
>>>>> file is about 1.2GB. My setup is standalone spark cluster with 4 workers
>>>>> each with 2 cores / 16GB ram. I'm using branch-1.2 code built against
>>>>> hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).
>>>>>
>>>>> The whole count is taking on the order of a couple of minutes, which
>>>>> seems extremely slow.
>>>>> I've been looking into it and so far have noticed two things, hoping
>>>>> the community has seen this before and knows what to do...
>>>>>
>>>>> 1) Every executor seems to make an S3 call to read the *entire file* before
>>>>> making another call to read just it's split. Here's a paste I've cleaned up
>>>>> to show just one task: http://goo.gl/XCfyZA. I've verified this
>>>>> happens in every task. It is taking a long time (40-50 seconds), I don't
>>>>> see why it is doing this?
>>>>> 2) I've tried a few numPartitions parameters. When I make the
>>>>> parameter anything below 21 it seems to get ignored. Under the hood
>>>>> FileInputFormat is doing something that always ends up with at least 21
>>>>> partitions of ~64MB or so. I've also tried 40, 60, and 100 partitions and
>>>>> have seen that the performance only gets worse as I increase it beyond 21.
>>>>> I would like to try 8 just to see, but again I don't see how to force it to
>>>>> go below 21.
>>>>>
>>>>> Thanks for the help,
>>>>> - Nitay
>>>>> Founder & CTO
>>>>>
>>>>>
>>>>
>>>
>

Re: Spark S3 Performance

Posted by Andrei <fa...@gmail.com>.
Concerning your second question, I believe you try to set number of
partitions with something like this:

    rdd = sc.textFile(..., 8)

but things like `textFile()` don't actually take fixed number of
partitions. Instead, they expect *minimal* number of partitions. Since in
your file you have 21 blocks of data, it creates exactly 21 worker (which
is greater than 8, as expected). To set exact number of partitions, use
`repartition()` or its full version - `coalesce()` (see example [1])

[1]:
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#coalesce



On Sat, Nov 22, 2014 at 10:04 PM, Ashish Rangole <ar...@gmail.com> wrote:

> What makes you think that each executor is reading the whole file? If that
> is the case then the count value returned to the driver will be actual X
> NumOfExecutors. Is that the case when compared with actual lines in the
> input file? If the count returned is same as actual then you probably don't
> have an extra read problem.
>
> I also see this in your logs which indicates that it is a read that starts
> from an offset and reading one split size (64MB) worth of data:
>
> 14/11/20 15:39:45 [Executor task launch worker-1 ] INFO HadoopRDD: Input
> split: s3n://mybucket/myfile:335544320+67108864
> On Nov 22, 2014 7:23 AM, "Nitay Joffe" <ni...@actioniq.co> wrote:
>
>> Err I meant #1 :)
>>
>> - Nitay
>> Founder & CTO
>>
>>
>> On Sat, Nov 22, 2014 at 10:20 AM, Nitay Joffe <ni...@actioniq.co> wrote:
>>
>>> Anyone have any thoughts on this? Trying to understand especially #2 if
>>> it's a legit bug or something I'm doing wrong.
>>>
>>> - Nitay
>>> Founder & CTO
>>>
>>>
>>> On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe <ni...@actioniq.co> wrote:
>>>
>>>> I have a simple S3 job to read a text file and do a line count.
>>>> Specifically I'm doing *sc.textFile("s3n://mybucket/myfile").count*.The
>>>> file is about 1.2GB. My setup is standalone spark cluster with 4 workers
>>>> each with 2 cores / 16GB ram. I'm using branch-1.2 code built against
>>>> hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).
>>>>
>>>> The whole count is taking on the order of a couple of minutes, which
>>>> seems extremely slow.
>>>> I've been looking into it and so far have noticed two things, hoping
>>>> the community has seen this before and knows what to do...
>>>>
>>>> 1) Every executor seems to make an S3 call to read the *entire file* before
>>>> making another call to read just it's split. Here's a paste I've cleaned up
>>>> to show just one task: http://goo.gl/XCfyZA. I've verified this
>>>> happens in every task. It is taking a long time (40-50 seconds), I don't
>>>> see why it is doing this?
>>>> 2) I've tried a few numPartitions parameters. When I make the parameter
>>>> anything below 21 it seems to get ignored. Under the hood FileInputFormat
>>>> is doing something that always ends up with at least 21 partitions of ~64MB
>>>> or so. I've also tried 40, 60, and 100 partitions and have seen that the
>>>> performance only gets worse as I increase it beyond 21. I would like to try
>>>> 8 just to see, but again I don't see how to force it to go below 21.
>>>>
>>>> Thanks for the help,
>>>> - Nitay
>>>> Founder & CTO
>>>>
>>>>
>>>
>>

Re: Spark S3 Performance

Posted by Ashish Rangole <ar...@gmail.com>.
What makes you think that each executor is reading the whole file? If that
is the case then the count value returned to the driver will be actual X
NumOfExecutors. Is that the case when compared with actual lines in the
input file? If the count returned is same as actual then you probably don't
have an extra read problem.

I also see this in your logs which indicates that it is a read that starts
from an offset and reading one split size (64MB) worth of data:

14/11/20 15:39:45 [Executor task launch worker-1 ] INFO HadoopRDD: Input
split: s3n://mybucket/myfile:335544320+67108864
On Nov 22, 2014 7:23 AM, "Nitay Joffe" <ni...@actioniq.co> wrote:

> Err I meant #1 :)
>
> - Nitay
> Founder & CTO
>
>
> On Sat, Nov 22, 2014 at 10:20 AM, Nitay Joffe <ni...@actioniq.co> wrote:
>
>> Anyone have any thoughts on this? Trying to understand especially #2 if
>> it's a legit bug or something I'm doing wrong.
>>
>> - Nitay
>> Founder & CTO
>>
>>
>> On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe <ni...@actioniq.co> wrote:
>>
>>> I have a simple S3 job to read a text file and do a line count.
>>> Specifically I'm doing *sc.textFile("s3n://mybucket/myfile").count*.The
>>> file is about 1.2GB. My setup is standalone spark cluster with 4 workers
>>> each with 2 cores / 16GB ram. I'm using branch-1.2 code built against
>>> hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).
>>>
>>> The whole count is taking on the order of a couple of minutes, which
>>> seems extremely slow.
>>> I've been looking into it and so far have noticed two things, hoping the
>>> community has seen this before and knows what to do...
>>>
>>> 1) Every executor seems to make an S3 call to read the *entire file* before
>>> making another call to read just it's split. Here's a paste I've cleaned up
>>> to show just one task: http://goo.gl/XCfyZA. I've verified this happens
>>> in every task. It is taking a long time (40-50 seconds), I don't see why it
>>> is doing this?
>>> 2) I've tried a few numPartitions parameters. When I make the parameter
>>> anything below 21 it seems to get ignored. Under the hood FileInputFormat
>>> is doing something that always ends up with at least 21 partitions of ~64MB
>>> or so. I've also tried 40, 60, and 100 partitions and have seen that the
>>> performance only gets worse as I increase it beyond 21. I would like to try
>>> 8 just to see, but again I don't see how to force it to go below 21.
>>>
>>> Thanks for the help,
>>> - Nitay
>>> Founder & CTO
>>>
>>>
>>
>

Re: Spark S3 Performance

Posted by Nitay Joffe <ni...@actioniq.co>.
Err I meant #1 :)

- Nitay
Founder & CTO


On Sat, Nov 22, 2014 at 10:20 AM, Nitay Joffe <ni...@actioniq.co> wrote:

> Anyone have any thoughts on this? Trying to understand especially #2 if
> it's a legit bug or something I'm doing wrong.
>
> - Nitay
> Founder & CTO
>
>
> On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe <ni...@actioniq.co> wrote:
>
>> I have a simple S3 job to read a text file and do a line count.
>> Specifically I'm doing *sc.textFile("s3n://mybucket/myfile").count*.The
>> file is about 1.2GB. My setup is standalone spark cluster with 4 workers
>> each with 2 cores / 16GB ram. I'm using branch-1.2 code built against
>> hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).
>>
>> The whole count is taking on the order of a couple of minutes, which
>> seems extremely slow.
>> I've been looking into it and so far have noticed two things, hoping the
>> community has seen this before and knows what to do...
>>
>> 1) Every executor seems to make an S3 call to read the *entire file* before
>> making another call to read just it's split. Here's a paste I've cleaned up
>> to show just one task: http://goo.gl/XCfyZA. I've verified this happens
>> in every task. It is taking a long time (40-50 seconds), I don't see why it
>> is doing this?
>> 2) I've tried a few numPartitions parameters. When I make the parameter
>> anything below 21 it seems to get ignored. Under the hood FileInputFormat
>> is doing something that always ends up with at least 21 partitions of ~64MB
>> or so. I've also tried 40, 60, and 100 partitions and have seen that the
>> performance only gets worse as I increase it beyond 21. I would like to try
>> 8 just to see, but again I don't see how to force it to go below 21.
>>
>> Thanks for the help,
>> - Nitay
>> Founder & CTO
>>
>>
>

Re: Spark S3 Performance

Posted by Nitay Joffe <ni...@actioniq.co>.
Anyone have any thoughts on this? Trying to understand especially #2 if
it's a legit bug or something I'm doing wrong.

- Nitay
Founder & CTO


On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe <ni...@actioniq.co> wrote:

> I have a simple S3 job to read a text file and do a line count.
> Specifically I'm doing *sc.textFile("s3n://mybucket/myfile").count*.The
> file is about 1.2GB. My setup is standalone spark cluster with 4 workers
> each with 2 cores / 16GB ram. I'm using branch-1.2 code built against
> hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).
>
> The whole count is taking on the order of a couple of minutes, which seems
> extremely slow.
> I've been looking into it and so far have noticed two things, hoping the
> community has seen this before and knows what to do...
>
> 1) Every executor seems to make an S3 call to read the *entire file* before
> making another call to read just it's split. Here's a paste I've cleaned up
> to show just one task: http://goo.gl/XCfyZA. I've verified this happens
> in every task. It is taking a long time (40-50 seconds), I don't see why it
> is doing this?
> 2) I've tried a few numPartitions parameters. When I make the parameter
> anything below 21 it seems to get ignored. Under the hood FileInputFormat
> is doing something that always ends up with at least 21 partitions of ~64MB
> or so. I've also tried 40, 60, and 100 partitions and have seen that the
> performance only gets worse as I increase it beyond 21. I would like to try
> 8 just to see, but again I don't see how to force it to go below 21.
>
> Thanks for the help,
> - Nitay
> Founder & CTO
>
>