You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nicholas Chammas <ni...@gmail.com> on 2014/04/01 22:51:10 UTC

Re: Best practices: Parallelized write to / read from S3

Alright, so I've upped the minSplits parameter on my call to textFile, but
the resulting RDD still has only 1 partition, which I assume means it was
read in on a single process. I am checking the number of partitions in
pyspark by using the rdd._jrdd.splits().size() trick I picked up on this
list.

The source file is a gzipped text file. I have heard things about gzipped
files not being splittable.

Is this the reason that opening the file with minSplits = 10 still gives me
an RDD with one partition? If so, I guess the only way to speed up the load
would be to change the source file's format to something splittable.

Otherwise, if I want to speed up subsequent computation on the RDD, I
should explicitly partition it with a call to RDD.partitionBy(10).

Is that correct?


On Mon, Mar 31, 2014 at 1:15 PM, Nicholas Chammas <
nicholas.chammas@gmail.com> wrote:

> OK sweet. Thanks for walking me through that.
>
> I wish this were StackOverflow so I could bestow some nice rep on all you
> helpful people.
>
>
> On Mon, Mar 31, 2014 at 1:06 PM, Aaron Davidson <il...@gmail.com>wrote:
>
>> Note that you may have minSplits set to more than the number of cores in
>> the cluster, and Spark will just run as many as possible at a time. This is
>> better if certain nodes may be slow, for instance.
>>
>> In general, it is not necessarily the case that doubling the number of
>> cores doing IO will double the throughput, because you could be saturating
>> the throughput with fewer cores. However, S3 is odd in that each connection
>> gets way less bandwidth than your network link can provide, and it does
>> seem to scale linearly with the number of connections. So, yes, taking
>> minSplits up to 4 (or higher) will likely result in a 2x performance
>> improvement.
>>
>> saveAsTextFile() will use as many partitions (aka splits) as the RDD it's
>> being called on. So for instance:
>>
>> sc.textFile(myInputFile, 15).map(lambda x: x +
>> "!!!").saveAsTextFile(myOutputFile)
>>
>> will use 15 partitions to read the text file (i.e., up to 15 cores at a
>> time) and then again to save back to S3.
>>
>>
>>
>> On Mon, Mar 31, 2014 at 9:46 AM, Nicholas Chammas <
>> nicholas.chammas@gmail.com> wrote:
>>
>>> So setting minSplits<http://spark.incubator.apache.org/docs/latest/api/pyspark/pyspark.context.SparkContext-class.html#textFile> will
>>> set the parallelism on the read in SparkContext.textFile(), assuming I have
>>> the cores in the cluster to deliver that level of parallelism. And if I
>>> don't explicitly provide it, Spark will set the minSplits to 2.
>>>
>>> So for example, say I have a cluster with 4 cores total, and it takes 40
>>> minutes to read a single file from S3 with minSplits at 2. Tt should take
>>> roughly 20 minutes to read the same file if I up minSplits to 4.
>>>
>>> Did I understand that correctly?
>>>
>>> RDD.saveAsTextFile() doesn't have an analog to minSplits, so I'm
>>> guessing that's not an operation the user can tune.
>>>
>>>
>>> On Mon, Mar 31, 2014 at 12:29 PM, Aaron Davidson <il...@gmail.com>wrote:
>>>
>>>> Spark will only use each core for one task at a time, so doing
>>>>
>>>> sc.textFile(<s3 location>, <num reducers>)
>>>>
>>>> where you set "num reducers" to at least as many as the total number of
>>>> cores in your cluster, is about as fast you can get out of the box. Same
>>>> goes for saveAsTextFile.
>>>>
>>>>
>>>> On Mon, Mar 31, 2014 at 8:49 AM, Nicholas Chammas <
>>>> nicholas.chammas@gmail.com> wrote:
>>>>
>>>>> Howdy-doody,
>>>>>
>>>>> I have a single, very large file sitting in S3 that I want to read in
>>>>> with sc.textFile(). What are the best practices for reading in this file as
>>>>> quickly as possible? How do I parallelize the read as much as possible?
>>>>>
>>>>> Similarly, say I have a single, very large RDD sitting in memory that
>>>>> I want to write out to S3 with RDD.saveAsTextFile(). What are the best
>>>>> practices for writing this file out as quickly as possible?
>>>>>
>>>>> Nick
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> View this message in context: Best practices: Parallelized write to /
>>>>> read from S3<http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-Parallelized-write-to-read-from-S3-tp3516.html>
>>>>> Sent from the Apache Spark User List mailing list archive<http://apache-spark-user-list.1001560.n3.nabble.com/>at Nabble.com.
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Best practices: Parallelized write to / read from S3

Posted by Nicholas Chammas <ni...@gmail.com>.
Alright!

Thanks for that link. I did little research based on it and it looks like
Snappy or LZO + some container would be better alternatives to gzip.

I confirmed that gzip was cramping my style by trying sc.textFile() on an
uncompressed version of the text file. With the uncompressed file, setting
minSplits gives me an RDD that is partitioned as expected. This makes all
my subsequent operations, obviously, much faster.

I dunno if it would be appropriate to have Spark issue some kind of warning
that "Hey, your file is compressed using gzip so..."

Anyway, mystery solved!

Nick


On Tue, Apr 1, 2014 at 5:03 PM, Aaron Davidson <il...@gmail.com> wrote:

> Looks like you're right that gzip files are not easily splittable [1], and
> also about everything else you said.
>
> [1]
> http://mail-archives.apache.org/mod_mbox/spark-user/201310.mbox/%3CCANDWdjY2hN-=jXTSNZ8JHZ=G-S+ZKLNze=RGkJacJAW3tTOQQA@mail.gmail.com%3E
>
>
>
>
> On Tue, Apr 1, 2014 at 1:51 PM, Nicholas Chammas <
> nicholas.chammas@gmail.com> wrote:
>
>> Alright, so I've upped the minSplits parameter on my call to textFile,
>> but the resulting RDD still has only 1 partition, which I assume means it
>> was read in on a single process. I am checking the number of partitions in
>> pyspark by using the rdd._jrdd.splits().size() trick I picked up on this
>> list.
>>
>> The source file is a gzipped text file. I have heard things about gzipped
>> files not being splittable.
>>
>> Is this the reason that opening the file with minSplits = 10 still gives
>> me an RDD with one partition? If so, I guess the only way to speed up the
>> load would be to change the source file's format to something splittable.
>>
>> Otherwise, if I want to speed up subsequent computation on the RDD, I
>> should explicitly partition it with a call to RDD.partitionBy(10).
>>
>> Is that correct?
>>
>>
>> On Mon, Mar 31, 2014 at 1:15 PM, Nicholas Chammas <
>> nicholas.chammas@gmail.com> wrote:
>>
>>> OK sweet. Thanks for walking me through that.
>>>
>>> I wish this were StackOverflow so I could bestow some nice rep on all
>>> you helpful people.
>>>
>>>
>>> On Mon, Mar 31, 2014 at 1:06 PM, Aaron Davidson <il...@gmail.com>wrote:
>>>
>>>> Note that you may have minSplits set to more than the number of cores
>>>> in the cluster, and Spark will just run as many as possible at a time. This
>>>> is better if certain nodes may be slow, for instance.
>>>>
>>>> In general, it is not necessarily the case that doubling the number of
>>>> cores doing IO will double the throughput, because you could be saturating
>>>> the throughput with fewer cores. However, S3 is odd in that each connection
>>>> gets way less bandwidth than your network link can provide, and it does
>>>> seem to scale linearly with the number of connections. So, yes, taking
>>>> minSplits up to 4 (or higher) will likely result in a 2x performance
>>>> improvement.
>>>>
>>>> saveAsTextFile() will use as many partitions (aka splits) as the RDD
>>>> it's being called on. So for instance:
>>>>
>>>> sc.textFile(myInputFile, 15).map(lambda x: x +
>>>> "!!!").saveAsTextFile(myOutputFile)
>>>>
>>>> will use 15 partitions to read the text file (i.e., up to 15 cores at a
>>>> time) and then again to save back to S3.
>>>>
>>>>
>>>>
>>>> On Mon, Mar 31, 2014 at 9:46 AM, Nicholas Chammas <
>>>> nicholas.chammas@gmail.com> wrote:
>>>>
>>>>> So setting minSplits<http://spark.incubator.apache.org/docs/latest/api/pyspark/pyspark.context.SparkContext-class.html#textFile> will
>>>>> set the parallelism on the read in SparkContext.textFile(), assuming I have
>>>>> the cores in the cluster to deliver that level of parallelism. And if I
>>>>> don't explicitly provide it, Spark will set the minSplits to 2.
>>>>>
>>>>> So for example, say I have a cluster with 4 cores total, and it takes
>>>>> 40 minutes to read a single file from S3 with minSplits at 2. Tt should
>>>>> take roughly 20 minutes to read the same file if I up minSplits to 4.
>>>>>
>>>>> Did I understand that correctly?
>>>>>
>>>>> RDD.saveAsTextFile() doesn't have an analog to minSplits, so I'm
>>>>> guessing that's not an operation the user can tune.
>>>>>
>>>>>
>>>>> On Mon, Mar 31, 2014 at 12:29 PM, Aaron Davidson <il...@gmail.com>wrote:
>>>>>
>>>>>> Spark will only use each core for one task at a time, so doing
>>>>>>
>>>>>> sc.textFile(<s3 location>, <num reducers>)
>>>>>>
>>>>>> where you set "num reducers" to at least as many as the total number
>>>>>> of cores in your cluster, is about as fast you can get out of the box. Same
>>>>>> goes for saveAsTextFile.
>>>>>>
>>>>>>
>>>>>> On Mon, Mar 31, 2014 at 8:49 AM, Nicholas Chammas <
>>>>>> nicholas.chammas@gmail.com> wrote:
>>>>>>
>>>>>>> Howdy-doody,
>>>>>>>
>>>>>>> I have a single, very large file sitting in S3 that I want to read
>>>>>>> in with sc.textFile(). What are the best practices for reading in this file
>>>>>>> as quickly as possible? How do I parallelize the read as much as possible?
>>>>>>>
>>>>>>> Similarly, say I have a single, very large RDD sitting in memory
>>>>>>> that I want to write out to S3 with RDD.saveAsTextFile(). What are the best
>>>>>>> practices for writing this file out as quickly as possible?
>>>>>>>
>>>>>>> Nick
>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>> View this message in context: Best practices: Parallelized write to
>>>>>>> / read from S3<http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-Parallelized-write-to-read-from-S3-tp3516.html>
>>>>>>> Sent from the Apache Spark User List mailing list archive<http://apache-spark-user-list.1001560.n3.nabble.com/>at Nabble.com.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Best practices: Parallelized write to / read from S3

Posted by Aaron Davidson <il...@gmail.com>.
Looks like you're right that gzip files are not easily splittable [1], and
also about everything else you said.

[1]
http://mail-archives.apache.org/mod_mbox/spark-user/201310.mbox/%3CCANDWdjY2hN-=jXTSNZ8JHZ=G-S+ZKLNze=RGkJacJAW3tTOQQA@mail.gmail.com%3E




On Tue, Apr 1, 2014 at 1:51 PM, Nicholas Chammas <nicholas.chammas@gmail.com
> wrote:

> Alright, so I've upped the minSplits parameter on my call to textFile, but
> the resulting RDD still has only 1 partition, which I assume means it was
> read in on a single process. I am checking the number of partitions in
> pyspark by using the rdd._jrdd.splits().size() trick I picked up on this
> list.
>
> The source file is a gzipped text file. I have heard things about gzipped
> files not being splittable.
>
> Is this the reason that opening the file with minSplits = 10 still gives
> me an RDD with one partition? If so, I guess the only way to speed up the
> load would be to change the source file's format to something splittable.
>
> Otherwise, if I want to speed up subsequent computation on the RDD, I
> should explicitly partition it with a call to RDD.partitionBy(10).
>
> Is that correct?
>
>
> On Mon, Mar 31, 2014 at 1:15 PM, Nicholas Chammas <
> nicholas.chammas@gmail.com> wrote:
>
>> OK sweet. Thanks for walking me through that.
>>
>> I wish this were StackOverflow so I could bestow some nice rep on all you
>> helpful people.
>>
>>
>> On Mon, Mar 31, 2014 at 1:06 PM, Aaron Davidson <il...@gmail.com>wrote:
>>
>>> Note that you may have minSplits set to more than the number of cores in
>>> the cluster, and Spark will just run as many as possible at a time. This is
>>> better if certain nodes may be slow, for instance.
>>>
>>> In general, it is not necessarily the case that doubling the number of
>>> cores doing IO will double the throughput, because you could be saturating
>>> the throughput with fewer cores. However, S3 is odd in that each connection
>>> gets way less bandwidth than your network link can provide, and it does
>>> seem to scale linearly with the number of connections. So, yes, taking
>>> minSplits up to 4 (or higher) will likely result in a 2x performance
>>> improvement.
>>>
>>> saveAsTextFile() will use as many partitions (aka splits) as the RDD
>>> it's being called on. So for instance:
>>>
>>> sc.textFile(myInputFile, 15).map(lambda x: x +
>>> "!!!").saveAsTextFile(myOutputFile)
>>>
>>> will use 15 partitions to read the text file (i.e., up to 15 cores at a
>>> time) and then again to save back to S3.
>>>
>>>
>>>
>>> On Mon, Mar 31, 2014 at 9:46 AM, Nicholas Chammas <
>>> nicholas.chammas@gmail.com> wrote:
>>>
>>>> So setting minSplits<http://spark.incubator.apache.org/docs/latest/api/pyspark/pyspark.context.SparkContext-class.html#textFile> will
>>>> set the parallelism on the read in SparkContext.textFile(), assuming I have
>>>> the cores in the cluster to deliver that level of parallelism. And if I
>>>> don't explicitly provide it, Spark will set the minSplits to 2.
>>>>
>>>> So for example, say I have a cluster with 4 cores total, and it takes
>>>> 40 minutes to read a single file from S3 with minSplits at 2. Tt should
>>>> take roughly 20 minutes to read the same file if I up minSplits to 4.
>>>>
>>>> Did I understand that correctly?
>>>>
>>>> RDD.saveAsTextFile() doesn't have an analog to minSplits, so I'm
>>>> guessing that's not an operation the user can tune.
>>>>
>>>>
>>>> On Mon, Mar 31, 2014 at 12:29 PM, Aaron Davidson <il...@gmail.com>wrote:
>>>>
>>>>> Spark will only use each core for one task at a time, so doing
>>>>>
>>>>> sc.textFile(<s3 location>, <num reducers>)
>>>>>
>>>>> where you set "num reducers" to at least as many as the total number
>>>>> of cores in your cluster, is about as fast you can get out of the box. Same
>>>>> goes for saveAsTextFile.
>>>>>
>>>>>
>>>>> On Mon, Mar 31, 2014 at 8:49 AM, Nicholas Chammas <
>>>>> nicholas.chammas@gmail.com> wrote:
>>>>>
>>>>>> Howdy-doody,
>>>>>>
>>>>>> I have a single, very large file sitting in S3 that I want to read in
>>>>>> with sc.textFile(). What are the best practices for reading in this file as
>>>>>> quickly as possible? How do I parallelize the read as much as possible?
>>>>>>
>>>>>> Similarly, say I have a single, very large RDD sitting in memory that
>>>>>> I want to write out to S3 with RDD.saveAsTextFile(). What are the best
>>>>>> practices for writing this file out as quickly as possible?
>>>>>>
>>>>>> Nick
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>> View this message in context: Best practices: Parallelized write to
>>>>>> / read from S3<http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-Parallelized-write-to-read-from-S3-tp3516.html>
>>>>>> Sent from the Apache Spark User List mailing list archive<http://apache-spark-user-list.1001560.n3.nabble.com/>at Nabble.com.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>