You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "assaf.mendelson" <as...@rsa.com> on 2016/11/23 12:36:39 UTC

Aggregating over sorted data

Hi,
An issue I have encountered frequently is the need to look at data in an ordered manner per key.
A common way of doing this can be seen in the classic map reduce as the shuffle stage provides sorted data per key and one can therefore do a lot with that.
It is of course relatively easy to achieve this by using RDD but that would mean moving to RDD and back which has a non-insignificant performance penalty (beyond the fact that we lose any catalyst optimization).
We can use SQL window functions but that is not an ideal solution either. Beyond the fact that window functions are much slower than aggregate functions (as we need to generate a result for every record), we also can't join them together (i.e. if we have two window functions on the same window, it is still two separate scans).

Ideally, I would have liked to see something like: df.groupBy(col1).sortBy(col2).agg(...) and have the aggregations work on the sorted data. That would enable to use both the existing functions and UDAF where we can assume the order (and do any windowing we need as part of the function itself which is relatively easy in many cases).

I have tried to look for this and seen many questions on the subject but no answers.

I was hoping I missed something (I have seen that the SQL CLUSTER BY command repartitions and sorts accordingly but from my understanding it does not promise that this would remain true if we do a groupby afterwards). If I didn't, I believe this should be a feature to add (I can open a JIRA if people think it is a good idea).
Assaf.





--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp19999.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: Aggregating over sorted data

Posted by nsyca <ns...@gmail.com>.
Hi,

SPARK-18591 <https://issues.apache.org/jira/browse/SPARK-18591>   might be a
solution to your problem but making assuming in your UDAF logic on how Spark
will process the aggregation is really a risky thing. Is there a way to do
it using Windows function with ORDER BY clause to enforce the processing in
this case?




-----
Nattavut Sutyanyong | @nsyca
Spark Technology Center
http://www.spark.tc/
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp19999p20206.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Aggregating over sorted data

Posted by Koert Kuipers <ko...@tresata.com>.
yes it's less optimal because an abstraction is missing and with
mapPartitions it is done without optimizations. but aggregator is not the
right abstraction to begin with, is assumes a monoid which means no
ordering guarantees. you need a fold operation.

On Dec 22, 2016 02:20, "Liang-Chi Hsieh" <vi...@gmail.com> wrote:

>
> You can't use existing aggregation functions with that. Besides, the
> execution plan of `mapPartitions` doesn't support wholestage codegen.
> Without that and some optimization around aggregation, that might be
> possible performance degradation. Also when you have more than one keys in
> a
> partition, you will need to take care of that in your function applied to
> each partition.
>
>
> Koert Kuipers wrote
> > it can also be done with repartition + sortWithinPartitions +
> > mapPartitions.
> > perhaps not as convenient but it does not rely on undocumented behavior.
> > i used this approach in spark-sorted. see here:
> > https://github.com/tresata/spark-sorted/blob/master/src/
> main/scala/com/tresata/spark/sorted/sql/GroupSortedDataset.scala
> >
> > On Wed, Dec 21, 2016 at 9:44 PM, Liang-Chi Hsieh &lt;
>
> > viirya@
>
> > &gt; wrote:
> >
> >>
> >> I agreed that to make sure this work, you might need to know the Spark
> >> internal implementation for APIs such as `groupBy`.
> >>
> >> But without any more changes to current Spark implementation, I think
> >> this
> >> is the one possible way to achieve the required function to aggregate on
> >> sorted data per key.
> >>
> >>
> >>
> >>
> >>
> >> -----
> >> Liang-Chi Hsieh | @viirya
> >> Spark Technology Center
> >> http://www.spark.tc/
> >> --
> >> View this message in context: http://apache-spark-
> >> developers-list.1001551.n3.nabble.com/Aggregating-over-
> >> sorted-data-tp19999p20331.html
> >> Sent from the Apache Spark Developers List mailing list archive at
> >> Nabble.com.
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe e-mail:
>
> > dev-unsubscribe@.apache
>
> >>
> >>
>
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Aggregating-over-
> sorted-data-tp19999p20333.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>

Re: Aggregating over sorted data

Posted by Liang-Chi Hsieh <vi...@gmail.com>.
You can't use existing aggregation functions with that. Besides, the
execution plan of `mapPartitions` doesn't support wholestage codegen.
Without that and some optimization around aggregation, that might be
possible performance degradation. Also when you have more than one keys in a
partition, you will need to take care of that in your function applied to
each partition.


Koert Kuipers wrote
> it can also be done with repartition + sortWithinPartitions +
> mapPartitions.
> perhaps not as convenient but it does not rely on undocumented behavior.
> i used this approach in spark-sorted. see here:
> https://github.com/tresata/spark-sorted/blob/master/src/main/scala/com/tresata/spark/sorted/sql/GroupSortedDataset.scala
> 
> On Wed, Dec 21, 2016 at 9:44 PM, Liang-Chi Hsieh &lt;

> viirya@

> &gt; wrote:
> 
>>
>> I agreed that to make sure this work, you might need to know the Spark
>> internal implementation for APIs such as `groupBy`.
>>
>> But without any more changes to current Spark implementation, I think
>> this
>> is the one possible way to achieve the required function to aggregate on
>> sorted data per key.
>>
>>
>>
>>
>>
>> -----
>> Liang-Chi Hsieh | @viirya
>> Spark Technology Center
>> http://www.spark.tc/
>> --
>> View this message in context: http://apache-spark-
>> developers-list.1001551.n3.nabble.com/Aggregating-over-
>> sorted-data-tp19999p20331.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: 

> dev-unsubscribe@.apache

>>
>>





-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp19999p20333.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Aggregating over sorted data

Posted by tr...@gmail.com.
I would love this feature

On Thu, 22 Dec 2016, 18:45 assaf.mendelson, <as...@rsa.com> wrote:

> It seems that this aggregation is for dataset operations only. I would
> have hoped to be able to do dataframe aggregation. Something along the line
> of: sort_df(df).agg(my_agg_func)
>
>
>
> In any case, note that this kind of sorting is less efficient than the
> sorting done in window functions for example. Specifically here what is
> happening is that first the data is shuffled and then the entire partition
> is sorted. It is possible to do it another way (although I have no idea how
> to do it in spark without writing a UDAF which is probably very
> inefficient). The other way would be to collect everything by key in each
> partition, sort within the key (which would be a lot faster since there are
> fewer elements) and then merge the results.
>
>
>
> I was hoping to find something like: Efficient sortByKey to work with…
>
>
>
> *From:* Koert Kuipers [via Apache Spark Developers List] [mailto:ml-node+[hidden
> email] <http:///user/SendEmail.jtp?type=node&node=20334&i=0>]
> *Sent:* Thursday, December 22, 2016 7:14 AM
> *To:* Mendelson, Assaf
> *Subject:* Re: Aggregating over sorted data
>
>
>
> it can also be done with repartition + sortWithinPartitions +
> mapPartitions.
>
> perhaps not as convenient but it does not rely on undocumented behavior.
>
> i used this approach in spark-sorted. see here:
>
>
> https://github.com/tresata/spark-sorted/blob/master/src/main/scala/com/tresata/spark/sorted/sql/GroupSortedDataset.scala
>
> On Wed, Dec 21, 2016 at 9:44 PM, Liang-Chi Hsieh <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=20332&i=0>> wrote:
>
>
> I agreed that to make sure this work, you might need to know the Spark
> internal implementation for APIs such as `groupBy`.
>
> But without any more changes to current Spark implementation, I think this
> is the one possible way to achieve the required function to aggregate on
> sorted data per key.
>
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp19999p20331.html
>
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
>
> To unsubscribe e-mail: [hidden email]
> <http:///user/SendEmail.jtp?type=node&node=20332&i=1>
>
>
>
>
> ------------------------------
>
> *If you reply to this email, your message will be added to the discussion
> below:*
>
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp19999p20332.html
>
> To start a new topic under Apache Spark Developers List, email [hidden
> email] <http:///user/SendEmail.jtp?type=node&node=20334&i=1>
> To unsubscribe from Apache Spark Developers List, click here.
> NAML
> <http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
> ------------------------------
> View this message in context: RE: Aggregating over sorted data
> <http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp19999p20334.html>
> Sent from the Apache Spark Developers List mailing list archive
> <http://apache-spark-developers-list.1001551.n3.nabble.com/> at
> Nabble.com.
>

RE: Aggregating over sorted data

Posted by "assaf.mendelson" <as...@rsa.com>.
It seems that this aggregation is for dataset operations only. I would have hoped to be able to do dataframe aggregation. Something along the line of: sort_df(df).agg(my_agg_func)

In any case, note that this kind of sorting is less efficient than the sorting done in window functions for example. Specifically here what is happening is that first the data is shuffled and then the entire partition is sorted. It is possible to do it another way (although I have no idea how to do it in spark without writing a UDAF which is probably very inefficient). The other way would be to collect everything by key in each partition, sort within the key (which would be a lot faster since there are fewer elements) and then merge the results.

I was hoping to find something like: Efficient sortByKey to work with…

From: Koert Kuipers [via Apache Spark Developers List] [mailto:ml-node+s1001551n20332h81@n3.nabble.com]
Sent: Thursday, December 22, 2016 7:14 AM
To: Mendelson, Assaf
Subject: Re: Aggregating over sorted data

it can also be done with repartition + sortWithinPartitions + mapPartitions.
perhaps not as convenient but it does not rely on undocumented behavior.
i used this approach in spark-sorted. see here:
https://github.com/tresata/spark-sorted/blob/master/src/main/scala/com/tresata/spark/sorted/sql/GroupSortedDataset.scala

On Wed, Dec 21, 2016 at 9:44 PM, Liang-Chi Hsieh <[hidden email]</user/SendEmail.jtp?type=node&node=20332&i=0>> wrote:

I agreed that to make sure this work, you might need to know the Spark
internal implementation for APIs such as `groupBy`.

But without any more changes to current Spark implementation, I think this
is the one possible way to achieve the required function to aggregate on
sorted data per key.





-----
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp19999p20331.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]</user/SendEmail.jtp?type=node&node=20332&i=1>


________________________________
If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp19999p20332.html
To start a new topic under Apache Spark Developers List, email ml-node+s1001551n1h20@n3.nabble.com<ma...@n3.nabble.com>
To unsubscribe from Apache Spark Developers List, click here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp19999p20334.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: Aggregating over sorted data

Posted by Koert Kuipers <ko...@tresata.com>.
it can also be done with repartition + sortWithinPartitions + mapPartitions.
perhaps not as convenient but it does not rely on undocumented behavior.
i used this approach in spark-sorted. see here:
https://github.com/tresata/spark-sorted/blob/master/src/main/scala/com/tresata/spark/sorted/sql/GroupSortedDataset.scala

On Wed, Dec 21, 2016 at 9:44 PM, Liang-Chi Hsieh <vi...@gmail.com> wrote:

>
> I agreed that to make sure this work, you might need to know the Spark
> internal implementation for APIs such as `groupBy`.
>
> But without any more changes to current Spark implementation, I think this
> is the one possible way to achieve the required function to aggregate on
> sorted data per key.
>
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Aggregating-over-
> sorted-data-tp19999p20331.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>

Re: Aggregating over sorted data

Posted by Liang-Chi Hsieh <vi...@gmail.com>.
I agreed that to make sure this work, you might need to know the Spark
internal implementation for APIs such as `groupBy`.

But without any more changes to current Spark implementation, I think this
is the one possible way to achieve the required function to aggregate on
sorted data per key.





-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp19999p20331.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Aggregating over sorted data

Posted by Koert Kuipers <ko...@tresata.com>.
i think this works but it relies on groupBy and agg respecting the sorting.
the api provides no such guarantee, so this could break in future versions.
i would not rely on this i think...

On Dec 20, 2016 18:58, "Liang-Chi Hsieh" <vi...@gmail.com> wrote:

Hi,

Can you try the combination of `repartition` + `sortWithinPartitions` on the
dataset?

E.g.,

    val df = Seq((2, "b c a"), (1, "c a b"), (3, "a c b")).toDF("number",
"letters")
    val df2 =
      df.explode('letters) {
        case Row(letters: String) => letters.split(" ").map(Tuple1(_)).toSeq
      }

    df2
      .select('number, '_1 as 'letter)
      .repartition('number)
      .sortWithinPartitions('number, 'letter)
      .groupBy('number)
      .agg(collect_list('letter))
      .show()

    +------+--------------------+
    |number|collect_list(letter)|
    +------+--------------------+
    |     3|           [a, b, c]|
    |     1|           [a, b, c]|
    |     2|           [a, b, c]|
    +------+--------------------+

I think it should let you do aggregate on sorted data per key.




-----
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: http://apache-spark-
developers-list.1001551.n3.nabble.com/Aggregating-over-
sorted-data-tp19999p20310.html
Sent from the Apache Spark Developers List mailing list archive at
Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org

Re: Aggregating over sorted data

Posted by Liang-Chi Hsieh <vi...@gmail.com>.
Hi,

Can you try the combination of `repartition` + `sortWithinPartitions` on the
dataset?

E.g.,

    val df = Seq((2, "b c a"), (1, "c a b"), (3, "a c b")).toDF("number",
"letters")
    val df2 =
      df.explode('letters) {
        case Row(letters: String) => letters.split(" ").map(Tuple1(_)).toSeq
      }

    df2
      .select('number, '_1 as 'letter)
      .repartition('number)
      .sortWithinPartitions('number, 'letter)
      .groupBy('number)
      .agg(collect_list('letter))
      .show()

    +------+--------------------+
    |number|collect_list(letter)|
    +------+--------------------+
    |     3|           [a, b, c]|
    |     1|           [a, b, c]|
    |     2|           [a, b, c]|
    +------+--------------------+

I think it should let you do aggregate on sorted data per key.




-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp19999p20310.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Aggregating over sorted data

Posted by Koert Kuipers <ko...@tresata.com>.
take a look at:
https://issues.apache.org/jira/browse/SPARK-15798


On Dec 19, 2016 00:17, "Robin East" <ro...@xense.co.uk> wrote:

This is also a feature we need for our time-series processing



> On 19 Dec 2016, at 04:07, Liang-Chi Hsieh <vi...@gmail.com> wrote:
>
>
> Hi,
>
> As I know, Spark SQL doesn't provide native support for this feature now.
> After searching, I found only few database systems support it, e.g.,
> PostgreSQL.
>
> Actually based on the Spark SQL's aggregate system, I think it is not very
> difficult to add the support for this feature. The problem is how
frequently
> this feature is needed for Spark SQL users and if it is worth adding this,
> because as I see, this feature is not very common.
>
> Alternative possible to achieve this in current Spark SQL, is to use
> Aggregator with Dataset API. You can write your custom Aggregator which
has
> an user-defined JVM object as buffer to hold the input data into your
> aggregate function. But you may need to write necessary encoder for the
> buffer object.
>
> If you really need this feature, you may open a Jira to ask others'
opinion
> about this feature.
>
>
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
developers-list.1001551.n3.nabble.com/Aggregating-over-
sorted-data-tp19999p20273.html
> Sent from the Apache Spark Developers List mailing list archive at
Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org

Re: Aggregating over sorted data

Posted by Robin East <ro...@xense.co.uk>.
This is also a feature we need for our time-series processing 



> On 19 Dec 2016, at 04:07, Liang-Chi Hsieh <vi...@gmail.com> wrote:
> 
> 
> Hi,
> 
> As I know, Spark SQL doesn't provide native support for this feature now.
> After searching, I found only few database systems support it, e.g.,
> PostgreSQL.
> 
> Actually based on the Spark SQL's aggregate system, I think it is not very
> difficult to add the support for this feature. The problem is how frequently
> this feature is needed for Spark SQL users and if it is worth adding this,
> because as I see, this feature is not very common.
> 
> Alternative possible to achieve this in current Spark SQL, is to use
> Aggregator with Dataset API. You can write your custom Aggregator which has
> an user-defined JVM object as buffer to hold the input data into your
> aggregate function. But you may need to write necessary encoder for the
> buffer object.
> 
> If you really need this feature, you may open a Jira to ask others' opinion
> about this feature.
> 
> 
> 
> 
> 
> 
> -----
> Liang-Chi Hsieh | @viirya 
> Spark Technology Center 
> http://www.spark.tc/ 
> --
> View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp19999p20273.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
> 


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: Aggregating over sorted data

Posted by Liang-Chi Hsieh <vi...@gmail.com>.
Hi,

As I know, Spark SQL doesn't provide native support for this feature now.
After searching, I found only few database systems support it, e.g.,
PostgreSQL.

Actually based on the Spark SQL's aggregate system, I think it is not very
difficult to add the support for this feature. The problem is how frequently
this feature is needed for Spark SQL users and if it is worth adding this,
because as I see, this feature is not very common.

Alternative possible to achieve this in current Spark SQL, is to use
Aggregator with Dataset API. You can write your custom Aggregator which has
an user-defined JVM object as buffer to hold the input data into your
aggregate function. But you may need to write necessary encoder for the
buffer object.

If you really need this feature, you may open a Jira to ask others' opinion
about this feature.






-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp19999p20273.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org