You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Nicholas Chammas <ni...@gmail.com> on 2021/12/14 02:43:46 UTC

Re: Creating a memory-efficient AggregateFunction to calculate Median

No takers here? :)

I can see now why a median function is not available in most data
processing systems. It's pretty annoying to implement!

On Thu, Dec 9, 2021 at 9:25 PM Nicholas Chammas <ni...@gmail.com>
wrote:

> I'm trying to create a new aggregate function. It's my first time working
> with Catalyst, so it's exciting---but I'm also in a bit over my head.
>
> My goal is to create a function to calculate the median
> <https://issues.apache.org/jira/browse/SPARK-26589>.
>
> As a very simple solution, I could just define median to be an alias of `Percentile(col,
> 0.5)`. However, the leading comment on the Percentile expression
> <https://github.com/apache/spark/blob/08123a3795683238352e5bf55452de381349fdd9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala#L37-L39>
> highlights that it's very memory-intensive and can easily lead to
> OutOfMemory errors.
>
> So instead of using Percentile, I'm trying to create an Expression that
> calculates the median without needing to hold everything in memory at once.
> I'm considering two different approaches:
>
> 1. Define Median as a combination of existing expressions: The median can
> perhaps be built out of the existing expressions for Count
> <https://github.com/apache/spark/blob/9af338cd685bce26abbc2dd4d077bde5068157b1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala#L48>
> and NthValue
> <https://github.com/apache/spark/blob/568ad6aa4435ce76ca3b5d9966e64259ea1f9b38/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala#L675>
> .
>
> I don't see a template I can follow for building a new expression out of
> existing expressions (i.e. without having to implement a bunch of methods
> for DeclarativeAggregate or ImperativeAggregate). I also don't know how I
> would wrap NthValue to make it usable as a regular aggregate function. The
> wrapped NthValue would need an implicit window that provides the necessary
> ordering.
>
>
> Is there any potential to this idea? Any pointers on how to implement it?
>
>
> 2. Another memory-light approach to calculating the median requires
> multiple passes over the data to converge on the answer. The approach is described
> here
> <https://www.quora.com/Distributed-Algorithms/What-is-the-distributed-algorithm-to-determine-the-median-of-arrays-of-integers-located-on-different-computers>.
> (I posted a sketch implementation of this approach using Spark's user-level
> API here
> <https://issues.apache.org/jira/browse/SPARK-26589?focusedCommentId=17452081&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17452081>
> .)
>
> I am also struggling to understand how I would build an aggregate function
> like this, since it requires multiple passes over the data. From what I can
> see, Catalyst's aggregate functions are designed to work with a single pass
> over the data.
>
> We don't seem to have an interface for AggregateFunction that supports
> multiple passes over the data. Is there some way to do this?
>
>
> Again, this is my first serious foray into Catalyst. Any specific
> implementation guidance is appreciated!
>
> Nick
>
>

Re: Creating a memory-efficient AggregateFunction to calculate Median

Posted by Sean Owen <sr...@gmail.com>.
Parquet or ORC have the necessary stats to make this fast too already, but
only helps if you want the median of sorted data as stored on disk, rather
than the general case. Not sure you can do better than roughly what a sort
entails if you want the exact median

On Wed, Dec 15, 2021, 8:56 AM Pol Santamaria <po...@qbeast.io> wrote:

> Correct me if I am wrong, but If the dataset was indexed by the given
> column, you could get the median without reading the whole dataset,
> shuffling, and so on. Disclaimer (I work in Qbeast). So the issue is more
> on the data format and the possibility to push down the operation to the
> data source.
>
> On our side, we are working on an open data format that supports indexing
> and efficient sampling on data lakes (Qbeast Format), but I also know about
> other initiatives (Microsoft Hyperspace) to allow consuming indexed
> datasets with Apache Spark.
>
> If you are interested in experimenting with the median aggregate, I have
> some ideas on how to implement it for the Spark data source of Qbeast
> Format in an efficient way.
>
> [Qbeast-spark] https://github.com/Qbeast-io/qbeast-spark
> [Microsoft Hyperspace] https://github.com/microsoft/hyperspace
>
> Bests,
>
> Pol Santamaria
>
>
> On Tue, Dec 14, 2021 at 4:42 AM Nicholas Chammas <
> nicholas.chammas@gmail.com> wrote:
>
>> Yeah, I think approximate percentile is good enough most of the time.
>>
>> I don't have a specific need for a precise median. I was interested in
>> implementing it more as a Catalyst learning exercise, but it turns out I
>> picked a bad learning exercise to solve. :)
>>
>> On Mon, Dec 13, 2021 at 9:46 PM Reynold Xin <rx...@databricks.com> wrote:
>>
>>> tl;dr: there's no easy way to implement aggregate expressions that'd
>>> require multiple pass over data. It is simply not something that's
>>> supported and doing so would be very high cost.
>>>
>>> Would you be OK using approximate percentile? That's relatively cheap.
>>>
>>>
>>>
>>> On Mon, Dec 13, 2021 at 6:43 PM, Nicholas Chammas <
>>> nicholas.chammas@gmail.com> wrote:
>>>
>>>> No takers here? :)
>>>>
>>>> I can see now why a median function is not available in most data
>>>> processing systems. It's pretty annoying to implement!
>>>>
>>>> On Thu, Dec 9, 2021 at 9:25 PM Nicholas Chammas <
>>>> nicholas.chammas@gmail.com> wrote:
>>>>
>>>>> I'm trying to create a new aggregate function. It's my first time
>>>>> working with Catalyst, so it's exciting---but I'm also in a bit over my
>>>>> head.
>>>>>
>>>>> My goal is to create a function to calculate the median
>>>>> <https://issues.apache.org/jira/browse/SPARK-26589>.
>>>>>
>>>>> As a very simple solution, I could just define median to be an alias
>>>>> of `Percentile(col, 0.5)`. However, the leading comment on the
>>>>> Percentile expression
>>>>> <https://github.com/apache/spark/blob/08123a3795683238352e5bf55452de381349fdd9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala#L37-L39>
>>>>> highlights that it's very memory-intensive and can easily lead to
>>>>> OutOfMemory errors.
>>>>>
>>>>> So instead of using Percentile, I'm trying to create an Expression
>>>>> that calculates the median without needing to hold everything in memory at
>>>>> once. I'm considering two different approaches:
>>>>>
>>>>> 1. Define Median as a combination of existing expressions: The median
>>>>> can perhaps be built out of the existing expressions for Count
>>>>> <https://github.com/apache/spark/blob/9af338cd685bce26abbc2dd4d077bde5068157b1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala#L48>
>>>>> and NthValue
>>>>> <https://github.com/apache/spark/blob/568ad6aa4435ce76ca3b5d9966e64259ea1f9b38/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala#L675>
>>>>> .
>>>>>
>>>>> I don't see a template I can follow for building a new expression out
>>>>> of existing expressions (i.e. without having to implement a bunch of
>>>>> methods for DeclarativeAggregate or ImperativeAggregate). I also don't know
>>>>> how I would wrap NthValue to make it usable as a regular aggregate
>>>>> function. The wrapped NthValue would need an implicit window that provides
>>>>> the necessary ordering.
>>>>>
>>>>>
>>>>> Is there any potential to this idea? Any pointers on how to implement
>>>>> it?
>>>>>
>>>>>
>>>>> 2. Another memory-light approach to calculating the median requires
>>>>> multiple passes over the data to converge on the answer. The approach is described
>>>>> here
>>>>> <https://www.quora.com/Distributed-Algorithms/What-is-the-distributed-algorithm-to-determine-the-median-of-arrays-of-integers-located-on-different-computers>.
>>>>> (I posted a sketch implementation of this approach using Spark's user-level
>>>>> API here
>>>>> <https://issues.apache.org/jira/browse/SPARK-26589?focusedCommentId=17452081&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17452081>
>>>>> .)
>>>>>
>>>>> I am also struggling to understand how I would build an aggregate
>>>>> function like this, since it requires multiple passes over the data. From
>>>>> what I can see, Catalyst's aggregate functions are designed to work with a
>>>>> single pass over the data.
>>>>>
>>>>> We don't seem to have an interface for AggregateFunction that supports
>>>>> multiple passes over the data. Is there some way to do this?
>>>>>
>>>>>
>>>>> Again, this is my first serious foray into Catalyst. Any specific
>>>>> implementation guidance is appreciated!
>>>>>
>>>>> Nick
>>>>>
>>>>
>>>

Re: Creating a memory-efficient AggregateFunction to calculate Median

Posted by Pol Santamaria <po...@qbeast.io>.
Correct me if I am wrong, but If the dataset was indexed by the given
column, you could get the median without reading the whole dataset,
shuffling, and so on. Disclaimer (I work in Qbeast). So the issue is more
on the data format and the possibility to push down the operation to the
data source.

On our side, we are working on an open data format that supports indexing
and efficient sampling on data lakes (Qbeast Format), but I also know about
other initiatives (Microsoft Hyperspace) to allow consuming indexed
datasets with Apache Spark.

If you are interested in experimenting with the median aggregate, I have
some ideas on how to implement it for the Spark data source of Qbeast
Format in an efficient way.

[Qbeast-spark] https://github.com/Qbeast-io/qbeast-spark
[Microsoft Hyperspace] https://github.com/microsoft/hyperspace

Bests,

Pol Santamaria


On Tue, Dec 14, 2021 at 4:42 AM Nicholas Chammas <ni...@gmail.com>
wrote:

> Yeah, I think approximate percentile is good enough most of the time.
>
> I don't have a specific need for a precise median. I was interested in
> implementing it more as a Catalyst learning exercise, but it turns out I
> picked a bad learning exercise to solve. :)
>
> On Mon, Dec 13, 2021 at 9:46 PM Reynold Xin <rx...@databricks.com> wrote:
>
>> tl;dr: there's no easy way to implement aggregate expressions that'd
>> require multiple pass over data. It is simply not something that's
>> supported and doing so would be very high cost.
>>
>> Would you be OK using approximate percentile? That's relatively cheap.
>>
>>
>>
>> On Mon, Dec 13, 2021 at 6:43 PM, Nicholas Chammas <
>> nicholas.chammas@gmail.com> wrote:
>>
>>> No takers here? :)
>>>
>>> I can see now why a median function is not available in most data
>>> processing systems. It's pretty annoying to implement!
>>>
>>> On Thu, Dec 9, 2021 at 9:25 PM Nicholas Chammas <
>>> nicholas.chammas@gmail.com> wrote:
>>>
>>>> I'm trying to create a new aggregate function. It's my first time
>>>> working with Catalyst, so it's exciting---but I'm also in a bit over my
>>>> head.
>>>>
>>>> My goal is to create a function to calculate the median
>>>> <https://issues.apache.org/jira/browse/SPARK-26589>.
>>>>
>>>> As a very simple solution, I could just define median to be an alias of
>>>> `Percentile(col, 0.5)`. However, the leading comment on the Percentile
>>>> expression
>>>> <https://github.com/apache/spark/blob/08123a3795683238352e5bf55452de381349fdd9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala#L37-L39>
>>>> highlights that it's very memory-intensive and can easily lead to
>>>> OutOfMemory errors.
>>>>
>>>> So instead of using Percentile, I'm trying to create an Expression that
>>>> calculates the median without needing to hold everything in memory at once.
>>>> I'm considering two different approaches:
>>>>
>>>> 1. Define Median as a combination of existing expressions: The median
>>>> can perhaps be built out of the existing expressions for Count
>>>> <https://github.com/apache/spark/blob/9af338cd685bce26abbc2dd4d077bde5068157b1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala#L48>
>>>> and NthValue
>>>> <https://github.com/apache/spark/blob/568ad6aa4435ce76ca3b5d9966e64259ea1f9b38/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala#L675>
>>>> .
>>>>
>>>> I don't see a template I can follow for building a new expression out
>>>> of existing expressions (i.e. without having to implement a bunch of
>>>> methods for DeclarativeAggregate or ImperativeAggregate). I also don't know
>>>> how I would wrap NthValue to make it usable as a regular aggregate
>>>> function. The wrapped NthValue would need an implicit window that provides
>>>> the necessary ordering.
>>>>
>>>>
>>>> Is there any potential to this idea? Any pointers on how to implement
>>>> it?
>>>>
>>>>
>>>> 2. Another memory-light approach to calculating the median requires
>>>> multiple passes over the data to converge on the answer. The approach is described
>>>> here
>>>> <https://www.quora.com/Distributed-Algorithms/What-is-the-distributed-algorithm-to-determine-the-median-of-arrays-of-integers-located-on-different-computers>.
>>>> (I posted a sketch implementation of this approach using Spark's user-level
>>>> API here
>>>> <https://issues.apache.org/jira/browse/SPARK-26589?focusedCommentId=17452081&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17452081>
>>>> .)
>>>>
>>>> I am also struggling to understand how I would build an aggregate
>>>> function like this, since it requires multiple passes over the data. From
>>>> what I can see, Catalyst's aggregate functions are designed to work with a
>>>> single pass over the data.
>>>>
>>>> We don't seem to have an interface for AggregateFunction that supports
>>>> multiple passes over the data. Is there some way to do this?
>>>>
>>>>
>>>> Again, this is my first serious foray into Catalyst. Any specific
>>>> implementation guidance is appreciated!
>>>>
>>>> Nick
>>>>
>>>
>>

Re: Creating a memory-efficient AggregateFunction to calculate Median

Posted by Nicholas Chammas <ni...@gmail.com>.
Yeah, I think approximate percentile is good enough most of the time.

I don't have a specific need for a precise median. I was interested in
implementing it more as a Catalyst learning exercise, but it turns out I
picked a bad learning exercise to solve. :)

On Mon, Dec 13, 2021 at 9:46 PM Reynold Xin <rx...@databricks.com> wrote:

> tl;dr: there's no easy way to implement aggregate expressions that'd
> require multiple pass over data. It is simply not something that's
> supported and doing so would be very high cost.
>
> Would you be OK using approximate percentile? That's relatively cheap.
>
>
>
> On Mon, Dec 13, 2021 at 6:43 PM, Nicholas Chammas <
> nicholas.chammas@gmail.com> wrote:
>
>> No takers here? :)
>>
>> I can see now why a median function is not available in most data
>> processing systems. It's pretty annoying to implement!
>>
>> On Thu, Dec 9, 2021 at 9:25 PM Nicholas Chammas <
>> nicholas.chammas@gmail.com> wrote:
>>
>>> I'm trying to create a new aggregate function. It's my first time
>>> working with Catalyst, so it's exciting---but I'm also in a bit over my
>>> head.
>>>
>>> My goal is to create a function to calculate the median
>>> <https://issues.apache.org/jira/browse/SPARK-26589>.
>>>
>>> As a very simple solution, I could just define median to be an alias of `Percentile(col,
>>> 0.5)`. However, the leading comment on the Percentile expression
>>> <https://github.com/apache/spark/blob/08123a3795683238352e5bf55452de381349fdd9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala#L37-L39>
>>> highlights that it's very memory-intensive and can easily lead to
>>> OutOfMemory errors.
>>>
>>> So instead of using Percentile, I'm trying to create an Expression that
>>> calculates the median without needing to hold everything in memory at once.
>>> I'm considering two different approaches:
>>>
>>> 1. Define Median as a combination of existing expressions: The median
>>> can perhaps be built out of the existing expressions for Count
>>> <https://github.com/apache/spark/blob/9af338cd685bce26abbc2dd4d077bde5068157b1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala#L48>
>>> and NthValue
>>> <https://github.com/apache/spark/blob/568ad6aa4435ce76ca3b5d9966e64259ea1f9b38/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala#L675>
>>> .
>>>
>>> I don't see a template I can follow for building a new expression out of
>>> existing expressions (i.e. without having to implement a bunch of methods
>>> for DeclarativeAggregate or ImperativeAggregate). I also don't know how I
>>> would wrap NthValue to make it usable as a regular aggregate function. The
>>> wrapped NthValue would need an implicit window that provides the necessary
>>> ordering.
>>>
>>>
>>> Is there any potential to this idea? Any pointers on how to implement it?
>>>
>>>
>>> 2. Another memory-light approach to calculating the median requires
>>> multiple passes over the data to converge on the answer. The approach is described
>>> here
>>> <https://www.quora.com/Distributed-Algorithms/What-is-the-distributed-algorithm-to-determine-the-median-of-arrays-of-integers-located-on-different-computers>.
>>> (I posted a sketch implementation of this approach using Spark's user-level
>>> API here
>>> <https://issues.apache.org/jira/browse/SPARK-26589?focusedCommentId=17452081&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17452081>
>>> .)
>>>
>>> I am also struggling to understand how I would build an aggregate
>>> function like this, since it requires multiple passes over the data. From
>>> what I can see, Catalyst's aggregate functions are designed to work with a
>>> single pass over the data.
>>>
>>> We don't seem to have an interface for AggregateFunction that supports
>>> multiple passes over the data. Is there some way to do this?
>>>
>>>
>>> Again, this is my first serious foray into Catalyst. Any specific
>>> implementation guidance is appreciated!
>>>
>>> Nick
>>>
>>
>

Re: Creating a memory-efficient AggregateFunction to calculate Median

Posted by Reynold Xin <rx...@databricks.com>.
tl;dr: there's no easy way to implement aggregate expressions that'd require multiple pass over data. It is simply not something that's supported and doing so would be very high cost.

Would you be OK using approximate percentile? That's relatively cheap.

On Mon, Dec 13, 2021 at 6:43 PM, Nicholas Chammas < nicholas.chammas@gmail.com > wrote:

> 
> No takers here? :)
> 
> 
> I can see now why a median function is not available in most data
> processing systems. It's pretty annoying to implement!
> 
> On Thu, Dec 9, 2021 at 9:25 PM Nicholas Chammas < nicholas. chammas@ gmail.
> com ( nicholas.chammas@gmail.com ) > wrote:
> 
> 
>> I'm trying to create a new aggregate function. It's my first time working
>> with Catalyst, so it's exciting---but I'm also in a bit over my head.
>> 
>> 
>> My goal is to create a function to calculate the median (
>> https://issues.apache.org/jira/browse/SPARK-26589 ).
>> 
>> 
>> As a very simple solution, I could just define median to be an alias of ` Percentile(col,
>> 0.5)`. However, the leading comment on the Percentile expression (
>> https://github.com/apache/spark/blob/08123a3795683238352e5bf55452de381349fdd9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala#L37-L39
>> ) highlights that it's very memory-intensive and can easily lead to
>> OutOfMemory errors.
>> 
>> 
>> So instead of using Percentile, I'm trying to create an Expression that
>> calculates the median without needing to hold everything in memory at
>> once. I'm considering two different approaches:
>> 
>> 
>> 1. Define Median as a combination of existing expressions: The median can
>> perhaps be built out of the existing expressions for Count (
>> https://github.com/apache/spark/blob/9af338cd685bce26abbc2dd4d077bde5068157b1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala#L48
>> ) and NthValue (
>> https://github.com/apache/spark/blob/568ad6aa4435ce76ca3b5d9966e64259ea1f9b38/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala#L675
>> ).
>> 
>> 
>> 
>>> I don't see a template I can follow for building a new expression out of
>>> existing expressions (i.e. without having to implement a bunch of methods
>>> for DeclarativeAggregate or ImperativeAggregate). I also don't know how I
>>> would wrap NthValue to make it usable as a regular aggregate function. The
>>> wrapped NthValue would need an implicit window that provides the necessary
>>> ordering.
>>> 
>> 
>> 
>> 
>> 
>>> Is there any potential to this idea? Any pointers on how to implement it?
>>> 
>> 
>> 
>> 
>> 2. Another memory-light approach to calculating the median requires
>> multiple passes over the data to converge on the answer. The approach is described
>> here (
>> https://www.quora.com/Distributed-Algorithms/What-is-the-distributed-algorithm-to-determine-the-median-of-arrays-of-integers-located-on-different-computers
>> ). (I posted a sketch implementation of this approach using Spark's
>> user-level API here (
>> https://issues.apache.org/jira/browse/SPARK-26589?focusedCommentId=17452081&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17452081
>> ).)
>> 
>> 
>> 
>>> I am also struggling to understand how I would build an aggregate function
>>> like this, since it requires multiple passes over the data. From what I
>>> can see, Catalyst's aggregate functions are designed to work with a single
>>> pass over the data.
>>> 
>>> 
>>> We don't seem to have an interface for AggregateFunction that supports
>>> multiple passes over the data. Is there some way to do this?
>>> 
>> 
>> 
>> Again, this is my first serious foray into Catalyst. Any specific
>> implementation guidance is appreciated!
>> 
>> 
>> Nick
>> 
> 
>