You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by David Diebold <da...@gmail.com> on 2022/02/21 10:00:58 UTC

Question about spark.sql min_by

Hello all,

I'm trying to use the spark.sql min_by aggregation function with pyspark.
I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2

I have a dataframe made of these columns:
- productId : int
- sellerId : int
- price : double

For each product, I want to get the seller who sells the product for the
cheapest price.

Naive approach would be to do this, but I would expect two shuffles:

import spark.sql.functions as F
cheapest_prices_df  =
df.groupby('productId').agg(F.min('price').alias('price'))
cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId', 'price'])

I would had loved to do this instead :

import spark.sql.functions as F
cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
F.min_by('sellerId', 'price'))

Unfortunately min_by does not seem available in pyspark sql functions,
whereas I can see it in the doc :
https://spark.apache.org/docs/latest/api/sql/index.html

I have managed to use min_by with this approach but it looks slow (maybe
because of temp table creation ?):

df.createOrReplaceTempView("table")
cheapest_sellers_df = spark.sql("select min_by(sellerId, price) sellerId,
min(price) from table group by productId")

Is there a way I can rely on min_by directly in groupby ?
Is there some code missing in pyspark wrapper to make min_by visible
somehow ?

Thank you in advance for your help.

Cheers
David

Re: Question about spark.sql min_by

Posted by Mich Talebzadeh <mi...@gmail.com>.
I gave a similar answer to windowing functions in this thread add an
auto_increment column dated 7th February

https://lists.apache.org/list.html?user@spark.apache.org

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 21 Feb 2022 at 15:41, David Diebold <da...@gmail.com> wrote:

> Thank you for your answers.
> Indeed windowing should help there.
> Also, I just realized maybe I can try to create a struct column with both
> price and sellerId and apply min() on it, ordering would consider price
> first for the ordering (https://stackoverflow.com/a/52669177/2015762)
>
> Cheers!
>
> Le lun. 21 févr. 2022 à 16:12, ayan guha <gu...@gmail.com> a écrit :
>
>> Why this can not be done by window function? Or is min by is just a short
>> hand?
>>
>> On Tue, 22 Feb 2022 at 12:42 am, Sean Owen <sr...@gmail.com> wrote:
>>
>>> From the source code, looks like this function was added to pyspark in
>>> Spark 3.3, up for release soon. It exists in SQL. You can still use it in
>>> SQL with `spark.sql(...)` in Python though, not hard.
>>>
>>> On Mon, Feb 21, 2022 at 4:01 AM David Diebold <da...@gmail.com>
>>> wrote:
>>>
>>>> Hello all,
>>>>
>>>> I'm trying to use the spark.sql min_by aggregation function with
>>>> pyspark.
>>>> I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2
>>>>
>>>> I have a dataframe made of these columns:
>>>> - productId : int
>>>> - sellerId : int
>>>> - price : double
>>>>
>>>> For each product, I want to get the seller who sells the product for
>>>> the cheapest price.
>>>>
>>>> Naive approach would be to do this, but I would expect two shuffles:
>>>>
>>>> import spark.sql.functions as F
>>>> cheapest_prices_df  =
>>>> df.groupby('productId').agg(F.min('price').alias('price'))
>>>> cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId',
>>>> 'price'])
>>>>
>>>> I would had loved to do this instead :
>>>>
>>>> import spark.sql.functions as F
>>>> cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
>>>> F.min_by('sellerId', 'price'))
>>>>
>>>> Unfortunately min_by does not seem available in pyspark sql functions,
>>>> whereas I can see it in the doc :
>>>> https://spark.apache.org/docs/latest/api/sql/index.html
>>>>
>>>> I have managed to use min_by with this approach but it looks slow
>>>> (maybe because of temp table creation ?):
>>>>
>>>> df.createOrReplaceTempView("table")
>>>> cheapest_sellers_df = spark.sql("select min_by(sellerId, price)
>>>> sellerId, min(price) from table group by productId")
>>>>
>>>> Is there a way I can rely on min_by directly in groupby ?
>>>> Is there some code missing in pyspark wrapper to make min_by visible
>>>> somehow ?
>>>>
>>>> Thank you in advance for your help.
>>>>
>>>> Cheers
>>>> David
>>>>
>>> --
>> Best Regards,
>> Ayan Guha
>>
>

Re: Question about spark.sql min_by

Posted by David Diebold <da...@gmail.com>.
Thank you for your answers.
Indeed windowing should help there.
Also, I just realized maybe I can try to create a struct column with both
price and sellerId and apply min() on it, ordering would consider price
first for the ordering (https://stackoverflow.com/a/52669177/2015762)

Cheers!

Le lun. 21 févr. 2022 à 16:12, ayan guha <gu...@gmail.com> a écrit :

> Why this can not be done by window function? Or is min by is just a short
> hand?
>
> On Tue, 22 Feb 2022 at 12:42 am, Sean Owen <sr...@gmail.com> wrote:
>
>> From the source code, looks like this function was added to pyspark in
>> Spark 3.3, up for release soon. It exists in SQL. You can still use it in
>> SQL with `spark.sql(...)` in Python though, not hard.
>>
>> On Mon, Feb 21, 2022 at 4:01 AM David Diebold <da...@gmail.com>
>> wrote:
>>
>>> Hello all,
>>>
>>> I'm trying to use the spark.sql min_by aggregation function with pyspark.
>>> I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2
>>>
>>> I have a dataframe made of these columns:
>>> - productId : int
>>> - sellerId : int
>>> - price : double
>>>
>>> For each product, I want to get the seller who sells the product for the
>>> cheapest price.
>>>
>>> Naive approach would be to do this, but I would expect two shuffles:
>>>
>>> import spark.sql.functions as F
>>> cheapest_prices_df  =
>>> df.groupby('productId').agg(F.min('price').alias('price'))
>>> cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId',
>>> 'price'])
>>>
>>> I would had loved to do this instead :
>>>
>>> import spark.sql.functions as F
>>> cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
>>> F.min_by('sellerId', 'price'))
>>>
>>> Unfortunately min_by does not seem available in pyspark sql functions,
>>> whereas I can see it in the doc :
>>> https://spark.apache.org/docs/latest/api/sql/index.html
>>>
>>> I have managed to use min_by with this approach but it looks slow (maybe
>>> because of temp table creation ?):
>>>
>>> df.createOrReplaceTempView("table")
>>> cheapest_sellers_df = spark.sql("select min_by(sellerId, price)
>>> sellerId, min(price) from table group by productId")
>>>
>>> Is there a way I can rely on min_by directly in groupby ?
>>> Is there some code missing in pyspark wrapper to make min_by visible
>>> somehow ?
>>>
>>> Thank you in advance for your help.
>>>
>>> Cheers
>>> David
>>>
>> --
> Best Regards,
> Ayan Guha
>

Re: Question about spark.sql min_by

Posted by ayan guha <gu...@gmail.com>.
Why this can not be done by window function? Or is min by is just a short
hand?

On Tue, 22 Feb 2022 at 12:42 am, Sean Owen <sr...@gmail.com> wrote:

> From the source code, looks like this function was added to pyspark in
> Spark 3.3, up for release soon. It exists in SQL. You can still use it in
> SQL with `spark.sql(...)` in Python though, not hard.
>
> On Mon, Feb 21, 2022 at 4:01 AM David Diebold <da...@gmail.com>
> wrote:
>
>> Hello all,
>>
>> I'm trying to use the spark.sql min_by aggregation function with pyspark.
>> I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2
>>
>> I have a dataframe made of these columns:
>> - productId : int
>> - sellerId : int
>> - price : double
>>
>> For each product, I want to get the seller who sells the product for the
>> cheapest price.
>>
>> Naive approach would be to do this, but I would expect two shuffles:
>>
>> import spark.sql.functions as F
>> cheapest_prices_df  =
>> df.groupby('productId').agg(F.min('price').alias('price'))
>> cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId',
>> 'price'])
>>
>> I would had loved to do this instead :
>>
>> import spark.sql.functions as F
>> cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
>> F.min_by('sellerId', 'price'))
>>
>> Unfortunately min_by does not seem available in pyspark sql functions,
>> whereas I can see it in the doc :
>> https://spark.apache.org/docs/latest/api/sql/index.html
>>
>> I have managed to use min_by with this approach but it looks slow (maybe
>> because of temp table creation ?):
>>
>> df.createOrReplaceTempView("table")
>> cheapest_sellers_df = spark.sql("select min_by(sellerId, price) sellerId,
>> min(price) from table group by productId")
>>
>> Is there a way I can rely on min_by directly in groupby ?
>> Is there some code missing in pyspark wrapper to make min_by visible
>> somehow ?
>>
>> Thank you in advance for your help.
>>
>> Cheers
>> David
>>
> --
Best Regards,
Ayan Guha

Re: Question about spark.sql min_by

Posted by Sean Owen <sr...@gmail.com>.
From the source code, looks like this function was added to pyspark in
Spark 3.3, up for release soon. It exists in SQL. You can still use it in
SQL with `spark.sql(...)` in Python though, not hard.

On Mon, Feb 21, 2022 at 4:01 AM David Diebold <da...@gmail.com>
wrote:

> Hello all,
>
> I'm trying to use the spark.sql min_by aggregation function with pyspark.
> I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2
>
> I have a dataframe made of these columns:
> - productId : int
> - sellerId : int
> - price : double
>
> For each product, I want to get the seller who sells the product for the
> cheapest price.
>
> Naive approach would be to do this, but I would expect two shuffles:
>
> import spark.sql.functions as F
> cheapest_prices_df  =
> df.groupby('productId').agg(F.min('price').alias('price'))
> cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId',
> 'price'])
>
> I would had loved to do this instead :
>
> import spark.sql.functions as F
> cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
> F.min_by('sellerId', 'price'))
>
> Unfortunately min_by does not seem available in pyspark sql functions,
> whereas I can see it in the doc :
> https://spark.apache.org/docs/latest/api/sql/index.html
>
> I have managed to use min_by with this approach but it looks slow (maybe
> because of temp table creation ?):
>
> df.createOrReplaceTempView("table")
> cheapest_sellers_df = spark.sql("select min_by(sellerId, price) sellerId,
> min(price) from table group by productId")
>
> Is there a way I can rely on min_by directly in groupby ?
> Is there some code missing in pyspark wrapper to make min_by visible
> somehow ?
>
> Thank you in advance for your help.
>
> Cheers
> David
>