You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Pedro Rodriguez <sk...@gmail.com> on 2016/07/12 19:53:33 UTC

Tools for Balancing Partitions by Size

Hi,

Are there any tools for partitioning RDD/DataFrames by size at runtime? The
idea would be to specify that I would like for each partition to be roughly
X number of megabytes then write that through to S3. I haven't found
anything off the shelf, and looking through stack overflow posts doesn't
seem to yield anything concrete.

Is there a way to programmatically get the size or a size estimate for an
RDD/DataFrame at runtime (eg size of one record would be sufficient)? I
gave SizeEstimator a try, but it seems like the results varied quite a bit
(tried on whole RDD and a sample). It would also be useful to get
programmatic access to the size of the RDD in memory if it is cached.

Thanks,
-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience

Re: Tools for Balancing Partitions by Size

Posted by Pedro Rodriguez <sk...@gmail.com>.
Hi Gourav,

In our case, we process raw logs into parquet tables that downstream
applications can use for other jobs. The desired outcome is that we only
need to worry about unbalanced input data at the preprocess step so that
downstream jobs can assume balanced input data.

In our specific case, this works because although the raw log rows are of
variable size, the rows in the Spark SQL table are of fixed size by parsing
primitives or chopping arrays. Due to this, in our use case it makes sense
to think in terms of balanced file size because it directly correlates to
having a balanced number of rows/partition and thus balanced partitions.

Given this setting, are there any specific issues you foresee? I agree that
file size isn't a general solution, but in the setting I don't see a reason
it should not work.

Our overall goal is to avoid two problems when we write data to S3:
- Large number of small files (Kbs) since this makes S3 listing take a long
time
- Small number of large files (GBs) since this makes reads not as efficient

Thus far, we have done this on a per-application basis with repartition and
a manually tuned number of partitions, but this is inconvenient. We are
interested in seeing if there is a way to automatically infer the number of
partitions we should use so that our files in S3 have a particular average
size (without incurring too high an overhead cost).

The solution that seems most promising right now is:

   - Define a custom write function which does two steps:
   - Write one partition to S3 and get files size and number of records
   - Use that to determine the number of partitions to repartition to, then
   write everything to S3

What seems unclear is how to compute the parent RDD (suppose its an RDD
with wide dependencies like a join), get one partition for step (1), then
not recompute anything to do step (2) without an explicit cache. This would
make it so the additional overhead on the job is on writing one partition
to S3 which seems like an acceptable level of overhead. Perhaps this could
be accomplished by saying: RDD A computes the size of on partition, RDD B
holds all partitions except for the one from A, the parents of A and B are
the original parent RDD, RDD C has parents A and B and has the overall
write balanced function.

Thanks,
Pedro

On Wed, Jul 13, 2016 at 9:10 AM, Gourav Sengupta <go...@gmail.com>
wrote:

> Hi,
>
> Using file size is a very bad way of managing data provided you think that
> volume, variety and veracity does not holds true. Actually its a very bad
> way of thinking and designing data solutions, you are bound to hit bottle
> necks, optimization issues, and manual interventions.
>
> I have found thinking about data in logical partitions helps overcome most
> of the design problems that is mentioned above.
>
> You can either use reparition with shuffling or colasce with shuffle
> turned off to manage loads.
>
> If you are using HIVE just let me know.
>
>
> Regards,
> Gourav Sengupta
>
> On Wed, Jul 13, 2016 at 5:39 AM, Pedro Rodriguez <sk...@gmail.com>
> wrote:
>
>> The primary goal for balancing partitions would be for the write to S3.
>> We would like to prevent unbalanced partitions (can do with repartition),
>> but also avoid partitions that are too small or too large.
>>
>> So for that case, getting the cache size would work Maropu if its roughly
>> accurate, but for data ingest we aren’t caching, just writing straight
>> through to S3.
>>
>> The idea for writing to disk and checking for the size is interesting
>> Hatim. For certain jobs, it seems very doable to write a small percentage
>> of the data to S3, check the file size through the AWS API, and use that to
>> estimate the total size. Thanks for the idea.
>>
>> —
>> Pedro Rodriguez
>> PhD Student in Large-Scale Machine Learning | CU Boulder
>> Systems Oriented Data Scientist
>> UC Berkeley AMPLab Alumni
>>
>> pedrorodriguez.io | 909-353-4423
>> github.com/EntilZha | LinkedIn
>> <https://www.linkedin.com/in/pedrorodriguezscience>
>>
>> On July 12, 2016 at 7:26:17 PM, Hatim Diab (timdiab@gmail.com) wrote:
>>
>> Hi,
>>
>> Since the final size depends on data types and compression. I've had to
>> first get a rough estimate of data, written to disk, then compute the
>> number of partitions.
>>
>> partitions = int(ceil(size_data * conversion_ratio / block_size))
>>
>> In my case block size 256mb, source txt & dest is snappy parquet,
>> compression_ratio .6
>>
>> df.repartition(partitions).write.parquet(output)
>>
>> Which yields files in the range of 230mb.
>>
>> Another way was to count and come up with an imperial formula.
>>
>> Cheers,
>> Hatim
>>
>>
>> On Jul 12, 2016, at 9:07 PM, Takeshi Yamamuro <li...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> There is no simple way to access the size in a driver side.
>> Since the partitions of primitive typed data (e.g., int) are compressed
>> by `DataFrame#cache`,
>> the actual size is possibly a little bit different from processing
>> partitions size.
>>
>> // maropu
>>
>> On Wed, Jul 13, 2016 at 4:53 AM, Pedro Rodriguez <ski.rodriguez@gmail.com
>> > wrote:
>>
>>> Hi,
>>>
>>> Are there any tools for partitioning RDD/DataFrames by size at runtime?
>>> The idea would be to specify that I would like for each partition to be
>>> roughly X number of megabytes then write that through to S3. I haven't
>>> found anything off the shelf, and looking through stack overflow posts
>>> doesn't seem to yield anything concrete.
>>>
>>> Is there a way to programmatically get the size or a size estimate for
>>> an RDD/DataFrame at runtime (eg size of one record would be sufficient)? I
>>> gave SizeEstimator a try, but it seems like the results varied quite a bit
>>> (tried on whole RDD and a sample). It would also be useful to get
>>> programmatic access to the size of the RDD in memory if it is cached.
>>>
>>> Thanks,
>>> --
>>> Pedro Rodriguez
>>> PhD Student in Distributed Machine Learning | CU Boulder
>>> UC Berkeley AMPLab Alumni
>>>
>>> ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
>>> Github: github.com/EntilZha | LinkedIn:
>>> https://www.linkedin.com/in/pedrorodriguezscience
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience

Re: Tools for Balancing Partitions by Size

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

Using file size is a very bad way of managing data provided you think that
volume, variety and veracity does not holds true. Actually its a very bad
way of thinking and designing data solutions, you are bound to hit bottle
necks, optimization issues, and manual interventions.

I have found thinking about data in logical partitions helps overcome most
of the design problems that is mentioned above.

You can either use reparition with shuffling or colasce with shuffle turned
off to manage loads.

If you are using HIVE just let me know.


Regards,
Gourav Sengupta

On Wed, Jul 13, 2016 at 5:39 AM, Pedro Rodriguez <sk...@gmail.com>
wrote:

> The primary goal for balancing partitions would be for the write to S3. We
> would like to prevent unbalanced partitions (can do with repartition), but
> also avoid partitions that are too small or too large.
>
> So for that case, getting the cache size would work Maropu if its roughly
> accurate, but for data ingest we aren’t caching, just writing straight
> through to S3.
>
> The idea for writing to disk and checking for the size is interesting
> Hatim. For certain jobs, it seems very doable to write a small percentage
> of the data to S3, check the file size through the AWS API, and use that to
> estimate the total size. Thanks for the idea.
>
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
>
> pedrorodriguez.io | 909-353-4423
> github.com/EntilZha | LinkedIn
> <https://www.linkedin.com/in/pedrorodriguezscience>
>
> On July 12, 2016 at 7:26:17 PM, Hatim Diab (timdiab@gmail.com) wrote:
>
> Hi,
>
> Since the final size depends on data types and compression. I've had to
> first get a rough estimate of data, written to disk, then compute the
> number of partitions.
>
> partitions = int(ceil(size_data * conversion_ratio / block_size))
>
> In my case block size 256mb, source txt & dest is snappy parquet,
> compression_ratio .6
>
> df.repartition(partitions).write.parquet(output)
>
> Which yields files in the range of 230mb.
>
> Another way was to count and come up with an imperial formula.
>
> Cheers,
> Hatim
>
>
> On Jul 12, 2016, at 9:07 PM, Takeshi Yamamuro <li...@gmail.com>
> wrote:
>
> Hi,
>
> There is no simple way to access the size in a driver side.
> Since the partitions of primitive typed data (e.g., int) are compressed by
> `DataFrame#cache`,
> the actual size is possibly a little bit different from processing
> partitions size.
>
> // maropu
>
> On Wed, Jul 13, 2016 at 4:53 AM, Pedro Rodriguez <sk...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Are there any tools for partitioning RDD/DataFrames by size at runtime?
>> The idea would be to specify that I would like for each partition to be
>> roughly X number of megabytes then write that through to S3. I haven't
>> found anything off the shelf, and looking through stack overflow posts
>> doesn't seem to yield anything concrete.
>>
>> Is there a way to programmatically get the size or a size estimate for an
>> RDD/DataFrame at runtime (eg size of one record would be sufficient)? I
>> gave SizeEstimator a try, but it seems like the results varied quite a bit
>> (tried on whole RDD and a sample). It would also be useful to get
>> programmatic access to the size of the RDD in memory if it is cached.
>>
>> Thanks,
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>
>

Re: Tools for Balancing Partitions by Size

Posted by Pedro Rodriguez <sk...@gmail.com>.
The primary goal for balancing partitions would be for the write to S3. We would like to prevent unbalanced partitions (can do with repartition), but also avoid partitions that are too small or too large.

So for that case, getting the cache size would work Maropu if its roughly accurate, but for data ingest we aren’t caching, just writing straight through to S3.

The idea for writing to disk and checking for the size is interesting Hatim. For certain jobs, it seems very doable to write a small percentage of the data to S3, check the file size through the AWS API, and use that to estimate the total size. Thanks for the idea.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 12, 2016 at 7:26:17 PM, Hatim Diab (timdiab@gmail.com) wrote:

Hi,

Since the final size depends on data types and compression. I've had to first get a rough estimate of data, written to disk, then compute the number of partitions.

partitions = int(ceil(size_data * conversion_ratio / block_size))

In my case block size 256mb, source txt & dest is snappy parquet, compression_ratio .6

df.repartition(partitions).write.parquet(output)

Which yields files in the range of 230mb.

Another way was to count and come up with an imperial formula.

Cheers,
Hatim


On Jul 12, 2016, at 9:07 PM, Takeshi Yamamuro <li...@gmail.com> wrote:

Hi,

There is no simple way to access the size in a driver side.
Since the partitions of primitive typed data (e.g., int) are compressed by `DataFrame#cache`,
the actual size is possibly a little bit different from processing partitions size.

// maropu

On Wed, Jul 13, 2016 at 4:53 AM, Pedro Rodriguez <sk...@gmail.com> wrote:
Hi,

Are there any tools for partitioning RDD/DataFrames by size at runtime? The idea would be to specify that I would like for each partition to be roughly X number of megabytes then write that through to S3. I haven't found anything off the shelf, and looking through stack overflow posts doesn't seem to yield anything concrete.

Is there a way to programmatically get the size or a size estimate for an RDD/DataFrame at runtime (eg size of one record would be sufficient)? I gave SizeEstimator a try, but it seems like the results varied quite a bit (tried on whole RDD and a sample). It would also be useful to get programmatic access to the size of the RDD in memory if it is cached.

Thanks,
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience




--
---
Takeshi Yamamuro

Re: Tools for Balancing Partitions by Size

Posted by Hatim Diab <ti...@gmail.com>.
Hi,

Since the final size depends on data types and compression. I've had to first get a rough estimate of data, written to disk, then compute the number of partitions.

partitions = int(ceil(size_data * conversion_ratio / block_size))

In my case block size 256mb, source txt & dest is snappy parquet, compression_ratio .6

df.repartition(partitions).write.parquet(output)

Which yields files in the range of 230mb.

Another way was to count and come up with an imperial formula.

Cheers,
Hatim


> On Jul 12, 2016, at 9:07 PM, Takeshi Yamamuro <li...@gmail.com> wrote:
> 
> Hi,
> 
> There is no simple way to access the size in a driver side.
> Since the partitions of primitive typed data (e.g., int) are compressed by `DataFrame#cache`,
> the actual size is possibly a little bit different from processing partitions size.
> 
> // maropu
> 
>> On Wed, Jul 13, 2016 at 4:53 AM, Pedro Rodriguez <sk...@gmail.com> wrote:
>> Hi,
>> 
>> Are there any tools for partitioning RDD/DataFrames by size at runtime? The idea would be to specify that I would like for each partition to be roughly X number of megabytes then write that through to S3. I haven't found anything off the shelf, and looking through stack overflow posts doesn't seem to yield anything concrete.
>> 
>> Is there a way to programmatically get the size or a size estimate for an RDD/DataFrame at runtime (eg size of one record would be sufficient)? I gave SizeEstimator a try, but it seems like the results varied quite a bit (tried on whole RDD and a sample). It would also be useful to get programmatic access to the size of the RDD in memory if it is cached.
>> 
>> Thanks,
>> -- 
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>> 
>> ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
> 
> 
> 
> -- 
> ---
> Takeshi Yamamuro

Re: Tools for Balancing Partitions by Size

Posted by Takeshi Yamamuro <li...@gmail.com>.
Hi,

There is no simple way to access the size in a driver side.
Since the partitions of primitive typed data (e.g., int) are compressed by
`DataFrame#cache`,
the actual size is possibly a little bit different from processing
partitions size.

// maropu

On Wed, Jul 13, 2016 at 4:53 AM, Pedro Rodriguez <sk...@gmail.com>
wrote:

> Hi,
>
> Are there any tools for partitioning RDD/DataFrames by size at runtime?
> The idea would be to specify that I would like for each partition to be
> roughly X number of megabytes then write that through to S3. I haven't
> found anything off the shelf, and looking through stack overflow posts
> doesn't seem to yield anything concrete.
>
> Is there a way to programmatically get the size or a size estimate for an
> RDD/DataFrame at runtime (eg size of one record would be sufficient)? I
> gave SizeEstimator a try, but it seems like the results varied quite a bit
> (tried on whole RDD and a sample). It would also be useful to get
> programmatic access to the size of the RDD in memory if it is cached.
>
> Thanks,
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>


-- 
---
Takeshi Yamamuro