You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Pedro Rodriguez <sk...@gmail.com> on 2016/07/08 19:57:10 UTC

DataFrame Min By Column

Is there a way to on a GroupedData (from groupBy in DataFrame) to have an
aggregate that returns column A based on a min of column B? For example, I
have a list of sites visited by a given user and I would like to find the
event with the minimum time (first event)

Thanks,
-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience

Re: DataFrame Min By Column

Posted by Michael Armbrust <mi...@databricks.com>.
I would guess that using the built in min/max/struct functions will be much
faster than a UDAF.  They should have native internal implementations that
utilize code generation.

On Sat, Jul 9, 2016 at 2:20 PM, Pedro Rodriguez <sk...@gmail.com>
wrote:

> Thanks Michael,
>
> That seems like the analog to sorting tuples. I am curious, is there a
> significant performance penalty to the UDAF versus that? Its certainly
> nicer and more compact code at least.
>
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
>
> pedrorodriguez.io | 909-353-4423
> github.com/EntilZha | LinkedIn
> <https://www.linkedin.com/in/pedrorodriguezscience>
>
> On July 9, 2016 at 2:19:11 PM, Michael Armbrust (michael@databricks.com)
> wrote:
>
> You can do whats called an *argmax/argmin*, where you take the min/max of
> a couple of columns that have been grouped together as a struct.  We sort
> in column order, so you can put the timestamp first.
>
> Here is an example
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3170497669323442/2840265927289860/latest.html>
> .
>
> On Sat, Jul 9, 2016 at 6:10 AM, Pedro Rodriguez <sk...@gmail.com>
> wrote:
>
>> I implemented a more generic version which I posted here:
>> https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a
>>
>> I think I could generalize this by pattern matching on DataType to use
>> different getLong/getDouble/etc functions ( not trying to use getAs[]
>> because getting T from Array[T] is hard it seems).
>>
>> Is there a way to go further and make the arguments unnecessary or
>> inferable at runtime, particularly for the valueType since it doesn’t
>> matter what it is? DataType is abstract so I can’t instantiate it, is there
>> a way to define the method so that it pulls from the user input at runtime?
>>
>> Thanks,
>> —
>> Pedro Rodriguez
>> PhD Student in Large-Scale Machine Learning | CU Boulder
>> Systems Oriented Data Scientist
>> UC Berkeley AMPLab Alumni
>>
>> pedrorodriguez.io | 909-353-4423
>> github.com/EntilZha | LinkedIn
>> <https://www.linkedin.com/in/pedrorodriguezscience>
>>
>> On July 9, 2016 at 1:33:18 AM, Pedro Rodriguez (ski.rodriguez@gmail.com)
>> wrote:
>>
>> Hi Xinh,
>>
>> A co-worker also found that solution but I thought it was possibly
>> overkill/brittle so looks into UDAFs (user defined aggregate functions). I
>> don’t have code, but Databricks has a post that has an example
>> https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html.
>> From that, I was able to write a MinLongByTimestamp function, but was
>> having a hard time writing a generic aggregate to any column by an order
>> able column.
>>
>> Anyone know how you might go about using generics in a UDAF, or something
>> that would mimic union types to express that order able spark sql types are
>> allowed?
>>
>> —
>> Pedro Rodriguez
>> PhD Student in Large-Scale Machine Learning | CU Boulder
>> Systems Oriented Data Scientist
>> UC Berkeley AMPLab Alumni
>>
>> pedrorodriguez.io | 909-353-4423
>> github.com/EntilZha | LinkedIn
>> <https://www.linkedin.com/in/pedrorodriguezscience>
>>
>> On July 8, 2016 at 6:06:32 PM, Xinh Huynh (xinh.huynh@gmail.com) wrote:
>>
>> Hi Pedro,
>>
>> I could not think of a way using an aggregate. It's possible with a
>> window function, partitioned on user and ordered by time:
>>
>> // Assuming "df" holds your dataframe ...
>>
>> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.expressions.Window
>> val wSpec = Window.partitionBy("user").orderBy("time")
>> df.select($"user", $"time", rank().over(wSpec).as("rank"))
>>   .where($"rank" === 1)
>>
>> Xinh
>>
>> On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez <ski.rodriguez@gmail.com
>> > wrote:
>>
>>> Is there a way to on a GroupedData (from groupBy in DataFrame) to have
>>> an aggregate that returns column A based on a min of column B? For example,
>>> I have a list of sites visited by a given user and I would like to find the
>>> event with the minimum time (first event)
>>>
>>> Thanks,
>>> --
>>> Pedro Rodriguez
>>> PhD Student in Distributed Machine Learning | CU Boulder
>>> UC Berkeley AMPLab Alumni
>>>
>>> ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
>>> Github: github.com/EntilZha | LinkedIn:
>>> https://www.linkedin.com/in/pedrorodriguezscience
>>>
>>>
>>
>

Re: DataFrame Min By Column

Posted by Pedro Rodriguez <sk...@gmail.com>.
Thanks Michael,

That seems like the analog to sorting tuples. I am curious, is there a significant performance penalty to the UDAF versus that? Its certainly nicer and more compact code at least.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 9, 2016 at 2:19:11 PM, Michael Armbrust (michael@databricks.com) wrote:

You can do whats called an argmax/argmin, where you take the min/max of a couple of columns that have been grouped together as a struct.  We sort in column order, so you can put the timestamp first.

Here is an example.

On Sat, Jul 9, 2016 at 6:10 AM, Pedro Rodriguez <sk...@gmail.com> wrote:
I implemented a more generic version which I posted here: https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a

I think I could generalize this by pattern matching on DataType to use different getLong/getDouble/etc functions ( not trying to use getAs[] because getting T from Array[T] is hard it seems).

Is there a way to go further and make the arguments unnecessary or inferable at runtime, particularly for the valueType since it doesn’t matter what it is? DataType is abstract so I can’t instantiate it, is there a way to define the method so that it pulls from the user input at runtime?

Thanks,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 9, 2016 at 1:33:18 AM, Pedro Rodriguez (ski.rodriguez@gmail.com) wrote:

Hi Xinh,

A co-worker also found that solution but I thought it was possibly overkill/brittle so looks into UDAFs (user defined aggregate functions). I don’t have code, but Databricks has a post that has an example https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html. From that, I was able to write a MinLongByTimestamp function, but was having a hard time writing a generic aggregate to any column by an order able column.

Anyone know how you might go about using generics in a UDAF, or something that would mimic union types to express that order able spark sql types are allowed?

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 8, 2016 at 6:06:32 PM, Xinh Huynh (xinh.huynh@gmail.com) wrote:

Hi Pedro,

I could not think of a way using an aggregate. It's possible with a window function, partitioned on user and ordered by time:

// Assuming "df" holds your dataframe ...

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val wSpec = Window.partitionBy("user").orderBy("time")
df.select($"user", $"time", rank().over(wSpec).as("rank"))
  .where($"rank" === 1)

Xinh

On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez <sk...@gmail.com> wrote:
Is there a way to on a GroupedData (from groupBy in DataFrame) to have an aggregate that returns column A based on a min of column B? For example, I have a list of sites visited by a given user and I would like to find the event with the minimum time (first event)

Thanks,
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience




Re: DataFrame Min By Column

Posted by Michael Armbrust <mi...@databricks.com>.
You can do whats called an *argmax/argmin*, where you take the min/max of a
couple of columns that have been grouped together as a struct.  We sort in
column order, so you can put the timestamp first.

Here is an example
<https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3170497669323442/2840265927289860/latest.html>
.

On Sat, Jul 9, 2016 at 6:10 AM, Pedro Rodriguez <sk...@gmail.com>
wrote:

> I implemented a more generic version which I posted here:
> https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a
>
> I think I could generalize this by pattern matching on DataType to use
> different getLong/getDouble/etc functions ( not trying to use getAs[]
> because getting T from Array[T] is hard it seems).
>
> Is there a way to go further and make the arguments unnecessary or
> inferable at runtime, particularly for the valueType since it doesn’t
> matter what it is? DataType is abstract so I can’t instantiate it, is there
> a way to define the method so that it pulls from the user input at runtime?
>
> Thanks,
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
>
> pedrorodriguez.io | 909-353-4423
> github.com/EntilZha | LinkedIn
> <https://www.linkedin.com/in/pedrorodriguezscience>
>
> On July 9, 2016 at 1:33:18 AM, Pedro Rodriguez (ski.rodriguez@gmail.com)
> wrote:
>
> Hi Xinh,
>
> A co-worker also found that solution but I thought it was possibly
> overkill/brittle so looks into UDAFs (user defined aggregate functions). I
> don’t have code, but Databricks has a post that has an example
> https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html.
> From that, I was able to write a MinLongByTimestamp function, but was
> having a hard time writing a generic aggregate to any column by an order
> able column.
>
> Anyone know how you might go about using generics in a UDAF, or something
> that would mimic union types to express that order able spark sql types are
> allowed?
>
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
>
> pedrorodriguez.io | 909-353-4423
> github.com/EntilZha | LinkedIn
> <https://www.linkedin.com/in/pedrorodriguezscience>
>
> On July 8, 2016 at 6:06:32 PM, Xinh Huynh (xinh.huynh@gmail.com) wrote:
>
> Hi Pedro,
>
> I could not think of a way using an aggregate. It's possible with a window
> function, partitioned on user and ordered by time:
>
> // Assuming "df" holds your dataframe ...
>
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.expressions.Window
> val wSpec = Window.partitionBy("user").orderBy("time")
> df.select($"user", $"time", rank().over(wSpec).as("rank"))
>   .where($"rank" === 1)
>
> Xinh
>
> On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez <sk...@gmail.com>
> wrote:
>
>> Is there a way to on a GroupedData (from groupBy in DataFrame) to have an
>> aggregate that returns column A based on a min of column B? For example, I
>> have a list of sites visited by a given user and I would like to find the
>> event with the minimum time (first event)
>>
>> Thanks,
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>>
>>
>

Re: DataFrame Min By Column

Posted by Pedro Rodriguez <sk...@gmail.com>.
I implemented a more generic version which I posted here: https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a

I think I could generalize this by pattern matching on DataType to use different getLong/getDouble/etc functions ( not trying to use getAs[] because getting T from Array[T] is hard it seems).

Is there a way to go further and make the arguments unnecessary or inferable at runtime, particularly for the valueType since it doesn’t matter what it is? DataType is abstract so I can’t instantiate it, is there a way to define the method so that it pulls from the user input at runtime?

Thanks,
—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 9, 2016 at 1:33:18 AM, Pedro Rodriguez (ski.rodriguez@gmail.com) wrote:

Hi Xinh,

A co-worker also found that solution but I thought it was possibly overkill/brittle so looks into UDAFs (user defined aggregate functions). I don’t have code, but Databricks has a post that has an example https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html. From that, I was able to write a MinLongByTimestamp function, but was having a hard time writing a generic aggregate to any column by an order able column.

Anyone know how you might go about using generics in a UDAF, or something that would mimic union types to express that order able spark sql types are allowed?

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 8, 2016 at 6:06:32 PM, Xinh Huynh (xinh.huynh@gmail.com) wrote:

Hi Pedro,

I could not think of a way using an aggregate. It's possible with a window function, partitioned on user and ordered by time:

// Assuming "df" holds your dataframe ...

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val wSpec = Window.partitionBy("user").orderBy("time")
df.select($"user", $"time", rank().over(wSpec).as("rank"))
  .where($"rank" === 1)

Xinh

On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez <sk...@gmail.com> wrote:
Is there a way to on a GroupedData (from groupBy in DataFrame) to have an aggregate that returns column A based on a min of column B? For example, I have a list of sites visited by a given user and I would like to find the event with the minimum time (first event)

Thanks,
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience



Re: DataFrame Min By Column

Posted by Pedro Rodriguez <sk...@gmail.com>.
Hi Xinh,

A co-worker also found that solution but I thought it was possibly overkill/brittle so looks into UDAFs (user defined aggregate functions). I don’t have code, but Databricks has a post that has an example https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html. From that, I was able to write a MinLongByTimestamp function, but was having a hard time writing a generic aggregate to any column by an order able column.

Anyone know how you might go about using generics in a UDAF, or something that would mimic union types to express that order able spark sql types are allowed?

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 8, 2016 at 6:06:32 PM, Xinh Huynh (xinh.huynh@gmail.com) wrote:

Hi Pedro,

I could not think of a way using an aggregate. It's possible with a window function, partitioned on user and ordered by time:

// Assuming "df" holds your dataframe ...

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val wSpec = Window.partitionBy("user").orderBy("time")
df.select($"user", $"time", rank().over(wSpec).as("rank"))
  .where($"rank" === 1)

Xinh

On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez <sk...@gmail.com> wrote:
Is there a way to on a GroupedData (from groupBy in DataFrame) to have an aggregate that returns column A based on a min of column B? For example, I have a list of sites visited by a given user and I would like to find the event with the minimum time (first event)

Thanks,
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience



Re: DataFrame Min By Column

Posted by Xinh Huynh <xi...@gmail.com>.
Hi Pedro,

I could not think of a way using an aggregate. It's possible with a window
function, partitioned on user and ordered by time:

// Assuming "df" holds your dataframe ...

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val wSpec = Window.partitionBy("user").orderBy("time")
df.select($"user", $"time", rank().over(wSpec).as("rank"))
  .where($"rank" === 1)

Xinh

On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez <sk...@gmail.com>
wrote:

> Is there a way to on a GroupedData (from groupBy in DataFrame) to have an
> aggregate that returns column A based on a min of column B? For example, I
> have a list of sites visited by a given user and I would like to find the
> event with the minimum time (first event)
>
> Thanks,
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>