You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Cesar Flores <ce...@gmail.com> on 2016/02/08 23:30:45 UTC

Optimal way to re-partition from a single partition

I have a data frame which I sort using orderBy function. This operation
causes my data frame to go to a single partition. After using those
results, I would like to re-partition to a larger number of partitions.
Currently I am just doing:

val rdd = df.rdd.coalesce(100, true) //df is a dataframe with a single
partition and around 14 million records
val newDF =  hc.createDataFrame(rdd, df.schema)

This process is really slow. Is there any other way of achieving this task,
or to optimize it (perhaps tweaking a spark configuration parameter)?


Thanks a lot
-- 
Cesar Flores

Re: Optimal way to re-partition from a single partition

Posted by Takeshi Yamamuro <li...@gmail.com>.
The issue is not almost solved even in newer Spark.


On Wed, Feb 10, 2016 at 1:36 AM, Cesar Flores <ce...@gmail.com> wrote:

> Well, actually I am observing a single partition no matter what my input
> is. I am using spark 1.3.1.
>
> For what you both are saying, it appears that this sorting issue (going to
> a single partition after applying orderBy in a DF) is solved in later
> version of Spark? Well, if that is the case, I guess I just need to wait
> until my workplace decides to update.
>
>
> Thanks a lot
>
> On Tue, Feb 9, 2016 at 9:39 AM, Takeshi Yamamuro <li...@gmail.com>
> wrote:
>
>> Hi,
>>
>> DataFrame#sort() uses `RangePartitioning` in `Exchange` instead of
>> `HashPartitioning`.
>> `RangePartitioning` roughly samples input data and internally computes
>> partition bounds
>> to split given rows into `spark.sql.shuffle.partitions` partitions.
>> Therefore, when sort keys are highly skewed, I think some partitions
>> could end up being empty
>> (that is, # of result partitions is lower than `spark.sql.shuffle.partitions`
>> .
>>
>>
>> On Tue, Feb 9, 2016 at 9:35 PM, Hemant Bhanawat <he...@gmail.com>
>> wrote:
>>
>>> For sql shuffle operations like groupby, the number of output partitions
>>> is controlled by spark.sql.shuffle.partitions. But, it seems orderBy does
>>> not honour this.
>>>
>>> In my small test, I could see that the number of partitions  in DF
>>> returned by orderBy was equal to the total number of distinct keys. Are you
>>> observing the same, I mean do you have a single value for all rows in the
>>> column on which you are running orderBy? If yes, you are better off not
>>> running the orderBy clause.
>>>
>>> May be someone from spark sql team could answer that how should the
>>> partitioning of the output DF be handled when doing an orderBy?
>>>
>>> Hemant
>>> www.snappydata.io
>>> https://github.com/SnappyDataInc/snappydata
>>>
>>>
>>>
>>>
>>> On Tue, Feb 9, 2016 at 4:00 AM, Cesar Flores <ce...@gmail.com> wrote:
>>>
>>>>
>>>> I have a data frame which I sort using orderBy function. This operation
>>>> causes my data frame to go to a single partition. After using those
>>>> results, I would like to re-partition to a larger number of partitions.
>>>> Currently I am just doing:
>>>>
>>>> val rdd = df.rdd.coalesce(100, true) //df is a dataframe with a single
>>>> partition and around 14 million records
>>>> val newDF =  hc.createDataFrame(rdd, df.schema)
>>>>
>>>> This process is really slow. Is there any other way of achieving this
>>>> task, or to optimize it (perhaps tweaking a spark configuration parameter)?
>>>>
>>>>
>>>> Thanks a lot
>>>> --
>>>> Cesar Flores
>>>>
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>
>
> --
> Cesar Flores
>



-- 
---
Takeshi Yamamuro

Re: Optimal way to re-partition from a single partition

Posted by Cesar Flores <ce...@gmail.com>.
Well, actually I am observing a single partition no matter what my input
is. I am using spark 1.3.1.

For what you both are saying, it appears that this sorting issue (going to
a single partition after applying orderBy in a DF) is solved in later
version of Spark? Well, if that is the case, I guess I just need to wait
until my workplace decides to update.


Thanks a lot

On Tue, Feb 9, 2016 at 9:39 AM, Takeshi Yamamuro <li...@gmail.com>
wrote:

> Hi,
>
> DataFrame#sort() uses `RangePartitioning` in `Exchange` instead of
> `HashPartitioning`.
> `RangePartitioning` roughly samples input data and internally computes
> partition bounds
> to split given rows into `spark.sql.shuffle.partitions` partitions.
> Therefore, when sort keys are highly skewed, I think some partitions could
> end up being empty
> (that is, # of result partitions is lower than `spark.sql.shuffle.partitions`
> .
>
>
> On Tue, Feb 9, 2016 at 9:35 PM, Hemant Bhanawat <he...@gmail.com>
> wrote:
>
>> For sql shuffle operations like groupby, the number of output partitions
>> is controlled by spark.sql.shuffle.partitions. But, it seems orderBy does
>> not honour this.
>>
>> In my small test, I could see that the number of partitions  in DF
>> returned by orderBy was equal to the total number of distinct keys. Are you
>> observing the same, I mean do you have a single value for all rows in the
>> column on which you are running orderBy? If yes, you are better off not
>> running the orderBy clause.
>>
>> May be someone from spark sql team could answer that how should the
>> partitioning of the output DF be handled when doing an orderBy?
>>
>> Hemant
>> www.snappydata.io
>> https://github.com/SnappyDataInc/snappydata
>>
>>
>>
>>
>> On Tue, Feb 9, 2016 at 4:00 AM, Cesar Flores <ce...@gmail.com> wrote:
>>
>>>
>>> I have a data frame which I sort using orderBy function. This operation
>>> causes my data frame to go to a single partition. After using those
>>> results, I would like to re-partition to a larger number of partitions.
>>> Currently I am just doing:
>>>
>>> val rdd = df.rdd.coalesce(100, true) //df is a dataframe with a single
>>> partition and around 14 million records
>>> val newDF =  hc.createDataFrame(rdd, df.schema)
>>>
>>> This process is really slow. Is there any other way of achieving this
>>> task, or to optimize it (perhaps tweaking a spark configuration parameter)?
>>>
>>>
>>> Thanks a lot
>>> --
>>> Cesar Flores
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>



-- 
Cesar Flores

Re: Optimal way to re-partition from a single partition

Posted by Hemant Bhanawat <he...@gmail.com>.
Ohk. I was comparing groupBy with orderBy and now I realize that they are
using different partitioning schemes.

Thanks Takeshi.



On Tue, Feb 9, 2016 at 9:09 PM, Takeshi Yamamuro <li...@gmail.com>
wrote:

> Hi,
>
> DataFrame#sort() uses `RangePartitioning` in `Exchange` instead of
> `HashPartitioning`.
> `RangePartitioning` roughly samples input data and internally computes
> partition bounds
> to split given rows into `spark.sql.shuffle.partitions` partitions.
> Therefore, when sort keys are highly skewed, I think some partitions could
> end up being empty
> (that is, # of result partitions is lower than `spark.sql.shuffle.partitions`
> .
>
>
> On Tue, Feb 9, 2016 at 9:35 PM, Hemant Bhanawat <he...@gmail.com>
> wrote:
>
>> For sql shuffle operations like groupby, the number of output partitions
>> is controlled by spark.sql.shuffle.partitions. But, it seems orderBy does
>> not honour this.
>>
>> In my small test, I could see that the number of partitions  in DF
>> returned by orderBy was equal to the total number of distinct keys. Are you
>> observing the same, I mean do you have a single value for all rows in the
>> column on which you are running orderBy? If yes, you are better off not
>> running the orderBy clause.
>>
>> May be someone from spark sql team could answer that how should the
>> partitioning of the output DF be handled when doing an orderBy?
>>
>> Hemant
>> www.snappydata.io
>> https://github.com/SnappyDataInc/snappydata
>>
>>
>>
>>
>> On Tue, Feb 9, 2016 at 4:00 AM, Cesar Flores <ce...@gmail.com> wrote:
>>
>>>
>>> I have a data frame which I sort using orderBy function. This operation
>>> causes my data frame to go to a single partition. After using those
>>> results, I would like to re-partition to a larger number of partitions.
>>> Currently I am just doing:
>>>
>>> val rdd = df.rdd.coalesce(100, true) //df is a dataframe with a single
>>> partition and around 14 million records
>>> val newDF =  hc.createDataFrame(rdd, df.schema)
>>>
>>> This process is really slow. Is there any other way of achieving this
>>> task, or to optimize it (perhaps tweaking a spark configuration parameter)?
>>>
>>>
>>> Thanks a lot
>>> --
>>> Cesar Flores
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>

Re: Optimal way to re-partition from a single partition

Posted by Takeshi Yamamuro <li...@gmail.com>.
Hi,

DataFrame#sort() uses `RangePartitioning` in `Exchange` instead of
`HashPartitioning`.
`RangePartitioning` roughly samples input data and internally computes
partition bounds
to split given rows into `spark.sql.shuffle.partitions` partitions.
Therefore, when sort keys are highly skewed, I think some partitions could
end up being empty
(that is, # of result partitions is lower than `spark.sql.shuffle.partitions`
.


On Tue, Feb 9, 2016 at 9:35 PM, Hemant Bhanawat <he...@gmail.com>
wrote:

> For sql shuffle operations like groupby, the number of output partitions
> is controlled by spark.sql.shuffle.partitions. But, it seems orderBy does
> not honour this.
>
> In my small test, I could see that the number of partitions  in DF
> returned by orderBy was equal to the total number of distinct keys. Are you
> observing the same, I mean do you have a single value for all rows in the
> column on which you are running orderBy? If yes, you are better off not
> running the orderBy clause.
>
> May be someone from spark sql team could answer that how should the
> partitioning of the output DF be handled when doing an orderBy?
>
> Hemant
> www.snappydata.io
> https://github.com/SnappyDataInc/snappydata
>
>
>
>
> On Tue, Feb 9, 2016 at 4:00 AM, Cesar Flores <ce...@gmail.com> wrote:
>
>>
>> I have a data frame which I sort using orderBy function. This operation
>> causes my data frame to go to a single partition. After using those
>> results, I would like to re-partition to a larger number of partitions.
>> Currently I am just doing:
>>
>> val rdd = df.rdd.coalesce(100, true) //df is a dataframe with a single
>> partition and around 14 million records
>> val newDF =  hc.createDataFrame(rdd, df.schema)
>>
>> This process is really slow. Is there any other way of achieving this
>> task, or to optimize it (perhaps tweaking a spark configuration parameter)?
>>
>>
>> Thanks a lot
>> --
>> Cesar Flores
>>
>
>


-- 
---
Takeshi Yamamuro

Re: Optimal way to re-partition from a single partition

Posted by Hemant Bhanawat <he...@gmail.com>.
For sql shuffle operations like groupby, the number of output partitions is
controlled by spark.sql.shuffle.partitions. But, it seems orderBy does not
honour this.

In my small test, I could see that the number of partitions  in DF returned
by orderBy was equal to the total number of distinct keys. Are you
observing the same, I mean do you have a single value for all rows in the
column on which you are running orderBy? If yes, you are better off not
running the orderBy clause.

May be someone from spark sql team could answer that how should the
partitioning of the output DF be handled when doing an orderBy?

Hemant
www.snappydata.io
https://github.com/SnappyDataInc/snappydata




On Tue, Feb 9, 2016 at 4:00 AM, Cesar Flores <ce...@gmail.com> wrote:

>
> I have a data frame which I sort using orderBy function. This operation
> causes my data frame to go to a single partition. After using those
> results, I would like to re-partition to a larger number of partitions.
> Currently I am just doing:
>
> val rdd = df.rdd.coalesce(100, true) //df is a dataframe with a single
> partition and around 14 million records
> val newDF =  hc.createDataFrame(rdd, df.schema)
>
> This process is really slow. Is there any other way of achieving this
> task, or to optimize it (perhaps tweaking a spark configuration parameter)?
>
>
> Thanks a lot
> --
> Cesar Flores
>

Re: Optimal way to re-partition from a single partition

Posted by Takeshi Yamamuro <li...@gmail.com>.
Hi,

Plz use DataFrame#repartition.

On Tue, Feb 9, 2016 at 7:30 AM, Cesar Flores <ce...@gmail.com> wrote:

>
> I have a data frame which I sort using orderBy function. This operation
> causes my data frame to go to a single partition. After using those
> results, I would like to re-partition to a larger number of partitions.
> Currently I am just doing:
>
> val rdd = df.rdd.coalesce(100, true) //df is a dataframe with a single
> partition and around 14 million records
> val newDF =  hc.createDataFrame(rdd, df.schema)
>
> This process is really slow. Is there any other way of achieving this
> task, or to optimize it (perhaps tweaking a spark configuration parameter)?
>
>
> Thanks a lot
> --
> Cesar Flores
>



-- 
---
Takeshi Yamamuro