You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Nezih Yigitbasi <ny...@netflix.com.INVALID> on 2016/02/24 23:08:30 UTC

how about a custom coalesce() policy?

Hi Spark devs,

I have sent an email about my problem some time ago where I want to merge a
large number of small files with Spark. Currently I am using Hive with the
CombineHiveInputFormat and I can control the size of the output files with
the max split size parameter (which is used for coalescing the input splits
by the CombineHiveInputFormat). My first attempt was to use coalesce(), but
since coalesce only considers the target number of partitions the output
file sizes were varying wildly.

What I think can be useful is to have an optional PartitionCoalescer
parameter (a new interface) in the coalesce() method (or maybe we can add a
new method ?) that the callers can implement for custom coalescing
strategies — for my use case I have already implemented a
SizeBasedPartitionCoalescer that coalesces partitions by looking at their
sizes and by using a max split size parameter, similar to the
CombineHiveInputFormat (I also had to expose HadoopRDD to get access to the
individual split sizes etc.).

What do you guys think about such a change, can it be useful to other users
as well? Or do you think that there is an easier way to accomplish the same
merge logic? If you think it may be useful, I already have an
implementation and I will be happy to work with the community to contribute
it.

Thanks,
Nezih
​

Re: how about a custom coalesce() policy?

Posted by Nezih Yigitbasi <ny...@netflix.com.INVALID>.
Sure, here <https://issues.apache.org/jira/browse/SPARK-14042> is the jira
and this <https://github.com/apache/spark/pull/11865> is the PR.

Nezih

On Sat, Apr 2, 2016 at 10:40 PM Hemant Bhanawat <he...@gmail.com>
wrote:

> correcting email id for Nezih
>
> Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
> www.snappydata.io
>
> On Sun, Apr 3, 2016 at 11:09 AM, Hemant Bhanawat <he...@gmail.com>
> wrote:
>
>> Hi Nezih,
>>
>> Can you share JIRA and PR numbers?
>>
>> This partial de-coupling of data partitioning strategy and spark
>> parallelism would be a useful feature for any data store.
>>
>> Hemant
>>
>> Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
>> www.snappydata.io
>>
>> On Fri, Apr 1, 2016 at 10:33 PM, Nezih Yigitbasi <
>> nyigitbasi@netflix.com.invalid> wrote:
>>
>>> Hey Reynold,
>>> Created an issue (and a PR) for this change to get discussions started.
>>>
>>> Thanks,
>>> Nezih
>>>
>>> On Fri, Feb 26, 2016 at 12:03 AM Reynold Xin <rx...@databricks.com>
>>> wrote:
>>>
>>>> Using the right email for Nezih
>>>>
>>>>
>>>> On Fri, Feb 26, 2016 at 12:01 AM, Reynold Xin <rx...@databricks.com>
>>>> wrote:
>>>>
>>>>> I think this can be useful.
>>>>>
>>>>> The only thing is that we are slowly migrating to the
>>>>> Dataset/DataFrame API, and leave RDD mostly as is as a lower level API.
>>>>> Maybe we should do both? In either case it would be great to discuss the
>>>>> API on a pull request. Cheers.
>>>>>
>>>>> On Wed, Feb 24, 2016 at 2:08 PM, Nezih Yigitbasi <
>>>>> nyigitbasi@netflix.com.invalid> wrote:
>>>>>
>>>>>> Hi Spark devs,
>>>>>>
>>>>>> I have sent an email about my problem some time ago where I want to
>>>>>> merge a large number of small files with Spark. Currently I am using Hive
>>>>>> with the CombineHiveInputFormat and I can control the size of the
>>>>>> output files with the max split size parameter (which is used for
>>>>>> coalescing the input splits by the CombineHiveInputFormat). My first
>>>>>> attempt was to use coalesce(), but since coalesce only considers the
>>>>>> target number of partitions the output file sizes were varying wildly.
>>>>>>
>>>>>> What I think can be useful is to have an optional PartitionCoalescer
>>>>>> parameter (a new interface) in the coalesce() method (or maybe we
>>>>>> can add a new method ?) that the callers can implement for custom
>>>>>> coalescing strategies — for my use case I have already implemented a
>>>>>> SizeBasedPartitionCoalescer that coalesces partitions by looking at
>>>>>> their sizes and by using a max split size parameter, similar to the
>>>>>> CombineHiveInputFormat (I also had to expose HadoopRDD to get access
>>>>>> to the individual split sizes etc.).
>>>>>>
>>>>>> What do you guys think about such a change, can it be useful to other
>>>>>> users as well? Or do you think that there is an easier way to accomplish
>>>>>> the same merge logic? If you think it may be useful, I already have
>>>>>> an implementation and I will be happy to work with the community to
>>>>>> contribute it.
>>>>>>
>>>>>> Thanks,
>>>>>> Nezih
>>>>>> ​
>>>>>>
>>>>>
>>>>>
>>>>
>>
>

Re: how about a custom coalesce() policy?

Posted by Hemant Bhanawat <he...@gmail.com>.
correcting email id for Nezih

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Sun, Apr 3, 2016 at 11:09 AM, Hemant Bhanawat <he...@gmail.com>
wrote:

> Hi Nezih,
>
> Can you share JIRA and PR numbers?
>
> This partial de-coupling of data partitioning strategy and spark
> parallelism would be a useful feature for any data store.
>
> Hemant
>
> Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
> www.snappydata.io
>
> On Fri, Apr 1, 2016 at 10:33 PM, Nezih Yigitbasi <
> nyigitbasi@netflix.com.invalid> wrote:
>
>> Hey Reynold,
>> Created an issue (and a PR) for this change to get discussions started.
>>
>> Thanks,
>> Nezih
>>
>> On Fri, Feb 26, 2016 at 12:03 AM Reynold Xin <rx...@databricks.com> wrote:
>>
>>> Using the right email for Nezih
>>>
>>>
>>> On Fri, Feb 26, 2016 at 12:01 AM, Reynold Xin <rx...@databricks.com>
>>> wrote:
>>>
>>>> I think this can be useful.
>>>>
>>>> The only thing is that we are slowly migrating to the Dataset/DataFrame
>>>> API, and leave RDD mostly as is as a lower level API. Maybe we should do
>>>> both? In either case it would be great to discuss the API on a pull
>>>> request. Cheers.
>>>>
>>>> On Wed, Feb 24, 2016 at 2:08 PM, Nezih Yigitbasi <
>>>> nyigitbasi@netflix.com.invalid> wrote:
>>>>
>>>>> Hi Spark devs,
>>>>>
>>>>> I have sent an email about my problem some time ago where I want to
>>>>> merge a large number of small files with Spark. Currently I am using Hive
>>>>> with the CombineHiveInputFormat and I can control the size of the
>>>>> output files with the max split size parameter (which is used for
>>>>> coalescing the input splits by the CombineHiveInputFormat). My first
>>>>> attempt was to use coalesce(), but since coalesce only considers the
>>>>> target number of partitions the output file sizes were varying wildly.
>>>>>
>>>>> What I think can be useful is to have an optional PartitionCoalescer
>>>>> parameter (a new interface) in the coalesce() method (or maybe we can
>>>>> add a new method ?) that the callers can implement for custom coalescing
>>>>> strategies — for my use case I have already implemented a
>>>>> SizeBasedPartitionCoalescer that coalesces partitions by looking at
>>>>> their sizes and by using a max split size parameter, similar to the
>>>>> CombineHiveInputFormat (I also had to expose HadoopRDD to get access
>>>>> to the individual split sizes etc.).
>>>>>
>>>>> What do you guys think about such a change, can it be useful to other
>>>>> users as well? Or do you think that there is an easier way to accomplish
>>>>> the same merge logic? If you think it may be useful, I already have
>>>>> an implementation and I will be happy to work with the community to
>>>>> contribute it.
>>>>>
>>>>> Thanks,
>>>>> Nezih
>>>>> ​
>>>>>
>>>>
>>>>
>>>
>

Re: how about a custom coalesce() policy?

Posted by Hemant Bhanawat <he...@gmail.com>.
Hi Nezih,

Can you share JIRA and PR numbers?

This partial de-coupling of data partitioning strategy and spark
parallelism would be a useful feature for any data store.

Hemant

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Fri, Apr 1, 2016 at 10:33 PM, Nezih Yigitbasi <
nyigitbasi@netflix.com.invalid> wrote:

> Hey Reynold,
> Created an issue (and a PR) for this change to get discussions started.
>
> Thanks,
> Nezih
>
> On Fri, Feb 26, 2016 at 12:03 AM Reynold Xin <rx...@databricks.com> wrote:
>
>> Using the right email for Nezih
>>
>>
>> On Fri, Feb 26, 2016 at 12:01 AM, Reynold Xin <rx...@databricks.com>
>> wrote:
>>
>>> I think this can be useful.
>>>
>>> The only thing is that we are slowly migrating to the Dataset/DataFrame
>>> API, and leave RDD mostly as is as a lower level API. Maybe we should do
>>> both? In either case it would be great to discuss the API on a pull
>>> request. Cheers.
>>>
>>> On Wed, Feb 24, 2016 at 2:08 PM, Nezih Yigitbasi <
>>> nyigitbasi@netflix.com.invalid> wrote:
>>>
>>>> Hi Spark devs,
>>>>
>>>> I have sent an email about my problem some time ago where I want to
>>>> merge a large number of small files with Spark. Currently I am using Hive
>>>> with the CombineHiveInputFormat and I can control the size of the
>>>> output files with the max split size parameter (which is used for
>>>> coalescing the input splits by the CombineHiveInputFormat). My first
>>>> attempt was to use coalesce(), but since coalesce only considers the
>>>> target number of partitions the output file sizes were varying wildly.
>>>>
>>>> What I think can be useful is to have an optional PartitionCoalescer
>>>> parameter (a new interface) in the coalesce() method (or maybe we can
>>>> add a new method ?) that the callers can implement for custom coalescing
>>>> strategies — for my use case I have already implemented a
>>>> SizeBasedPartitionCoalescer that coalesces partitions by looking at
>>>> their sizes and by using a max split size parameter, similar to the
>>>> CombineHiveInputFormat (I also had to expose HadoopRDD to get access
>>>> to the individual split sizes etc.).
>>>>
>>>> What do you guys think about such a change, can it be useful to other
>>>> users as well? Or do you think that there is an easier way to accomplish
>>>> the same merge logic? If you think it may be useful, I already have an
>>>> implementation and I will be happy to work with the community to contribute
>>>> it.
>>>>
>>>> Thanks,
>>>> Nezih
>>>> ​
>>>>
>>>
>>>
>>

Re: how about a custom coalesce() policy?

Posted by Nezih Yigitbasi <ny...@netflix.com.INVALID>.
Hey Reynold,
Created an issue (and a PR) for this change to get discussions started.

Thanks,
Nezih

On Fri, Feb 26, 2016 at 12:03 AM Reynold Xin <rx...@databricks.com> wrote:

> Using the right email for Nezih
>
>
> On Fri, Feb 26, 2016 at 12:01 AM, Reynold Xin <rx...@databricks.com> wrote:
>
>> I think this can be useful.
>>
>> The only thing is that we are slowly migrating to the Dataset/DataFrame
>> API, and leave RDD mostly as is as a lower level API. Maybe we should do
>> both? In either case it would be great to discuss the API on a pull
>> request. Cheers.
>>
>> On Wed, Feb 24, 2016 at 2:08 PM, Nezih Yigitbasi <
>> nyigitbasi@netflix.com.invalid> wrote:
>>
>>> Hi Spark devs,
>>>
>>> I have sent an email about my problem some time ago where I want to
>>> merge a large number of small files with Spark. Currently I am using Hive
>>> with the CombineHiveInputFormat and I can control the size of the
>>> output files with the max split size parameter (which is used for
>>> coalescing the input splits by the CombineHiveInputFormat). My first
>>> attempt was to use coalesce(), but since coalesce only considers the
>>> target number of partitions the output file sizes were varying wildly.
>>>
>>> What I think can be useful is to have an optional PartitionCoalescer
>>> parameter (a new interface) in the coalesce() method (or maybe we can
>>> add a new method ?) that the callers can implement for custom coalescing
>>> strategies — for my use case I have already implemented a
>>> SizeBasedPartitionCoalescer that coalesces partitions by looking at
>>> their sizes and by using a max split size parameter, similar to the
>>> CombineHiveInputFormat (I also had to expose HadoopRDD to get access to
>>> the individual split sizes etc.).
>>>
>>> What do you guys think about such a change, can it be useful to other
>>> users as well? Or do you think that there is an easier way to accomplish
>>> the same merge logic? If you think it may be useful, I already have an
>>> implementation and I will be happy to work with the community to contribute
>>> it.
>>>
>>> Thanks,
>>> Nezih
>>> ​
>>>
>>
>>
>

Re: how about a custom coalesce() policy?

Posted by Reynold Xin <rx...@databricks.com>.
Using the right email for Nezih


On Fri, Feb 26, 2016 at 12:01 AM, Reynold Xin <rx...@databricks.com> wrote:

> I think this can be useful.
>
> The only thing is that we are slowly migrating to the Dataset/DataFrame
> API, and leave RDD mostly as is as a lower level API. Maybe we should do
> both? In either case it would be great to discuss the API on a pull
> request. Cheers.
>
> On Wed, Feb 24, 2016 at 2:08 PM, Nezih Yigitbasi <
> nyigitbasi@netflix.com.invalid> wrote:
>
>> Hi Spark devs,
>>
>> I have sent an email about my problem some time ago where I want to merge
>> a large number of small files with Spark. Currently I am using Hive with
>> the CombineHiveInputFormat and I can control the size of the output
>> files with the max split size parameter (which is used for coalescing the
>> input splits by the CombineHiveInputFormat). My first attempt was to use
>> coalesce(), but since coalesce only considers the target number of
>> partitions the output file sizes were varying wildly.
>>
>> What I think can be useful is to have an optional PartitionCoalescer
>> parameter (a new interface) in the coalesce() method (or maybe we can
>> add a new method ?) that the callers can implement for custom coalescing
>> strategies — for my use case I have already implemented a
>> SizeBasedPartitionCoalescer that coalesces partitions by looking at
>> their sizes and by using a max split size parameter, similar to the
>> CombineHiveInputFormat (I also had to expose HadoopRDD to get access to
>> the individual split sizes etc.).
>>
>> What do you guys think about such a change, can it be useful to other
>> users as well? Or do you think that there is an easier way to accomplish
>> the same merge logic? If you think it may be useful, I already have an
>> implementation and I will be happy to work with the community to contribute
>> it.
>>
>> Thanks,
>> Nezih
>> ​
>>
>
>

Re: how about a custom coalesce() policy?

Posted by Reynold Xin <rx...@databricks.com>.
I think this can be useful.

The only thing is that we are slowly migrating to the Dataset/DataFrame
API, and leave RDD mostly as is as a lower level API. Maybe we should do
both? In either case it would be great to discuss the API on a pull
request. Cheers.

On Wed, Feb 24, 2016 at 2:08 PM, Nezih Yigitbasi <
nyigitbasi@netflix.com.invalid> wrote:

> Hi Spark devs,
>
> I have sent an email about my problem some time ago where I want to merge
> a large number of small files with Spark. Currently I am using Hive with
> the CombineHiveInputFormat and I can control the size of the output files
> with the max split size parameter (which is used for coalescing the input
> splits by the CombineHiveInputFormat). My first attempt was to use
> coalesce(), but since coalesce only considers the target number of
> partitions the output file sizes were varying wildly.
>
> What I think can be useful is to have an optional PartitionCoalescer
> parameter (a new interface) in the coalesce() method (or maybe we can add
> a new method ?) that the callers can implement for custom coalescing
> strategies — for my use case I have already implemented a
> SizeBasedPartitionCoalescer that coalesces partitions by looking at their
> sizes and by using a max split size parameter, similar to the
> CombineHiveInputFormat (I also had to expose HadoopRDD to get access to
> the individual split sizes etc.).
>
> What do you guys think about such a change, can it be useful to other
> users as well? Or do you think that there is an easier way to accomplish
> the same merge logic? If you think it may be useful, I already have an
> implementation and I will be happy to work with the community to contribute
> it.
>
> Thanks,
> Nezih
> ​
>