You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Nick Pentreath <ni...@gmail.com> on 2015/09/12 10:07:28 UTC

Re: HyperLogLogUDT

Inspired by this post:
http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/,
I've started putting together something based on the Spark 1.5 UDAF
interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141

Some questions -

1. How do I get the UDAF to accept input arguments of different type? We
can hash anything basically for HLL - Int, Long, String, Object, raw bytes
etc. Right now it seems we'd need to build a new UDAF for each input type,
which seems strange - I should be able to use one UDAF that can handle raw
input of different types, as well as handle existing HLLs that can be
merged/aggregated (e.g. for grouped data)
2. @Reynold, how would I ensure this works for Tungsten (ie against raw
bytes in memory)? Or does the new Aggregate2 stuff automatically do that?
Where should I look for examples on how this works internally?
3. I've based this on the Sum and Avg examples for the new UDAF interface -
any suggestions or issue please advise. Is the intermediate buffer
efficient?
4. The current HyperLogLogUDT is private - so I've had to make my own one
which is a bit pointless as it's copy-pasted. Any thoughts on exposing that
type? Or I need to make the package spark.sql ...

Nick

On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <rx...@databricks.com> wrote:

> Yes - it's very interesting. However, ideally we should have a version of
> hyperloglog that can work directly against some raw bytes in memory (rather
> than java objects), in order for this to fit the Tungsten execution model
> where everything is operating directly against some memory address.
>
> On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath <ni...@gmail.com>
> wrote:
>
>> Sure I can copy the code but my aim was more to understand:
>>
>> (A) if this is broadly interesting enough to folks to think about
>> updating / extending the existing UDAF within Spark
>> (b) how to register ones own custom UDAF - in which case it could be a
>> Spark package for example
>>
>> All examples deal with registering a UDF but nothing about UDAFs
>>
>> —
>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>
>>
>> On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos <
>> daniel.darabos@lynxanalytics.com> wrote:
>>
>>> It's already possible to just copy the code from countApproxDistinct
>>> <https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153> and
>>> access the HLL directly, or do anything you like.
>>>
>>> On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath <nick.pentreath@gmail.com
>>> > wrote:
>>>
>>>> Any thoughts?
>>>>
>>>> —
>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>
>>>>
>>>> On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath <
>>>> nick.pentreath@gmail.com> wrote:
>>>>
>>>>> Hey Spark devs
>>>>>
>>>>> I've been looking at DF UDFs and UDAFs. The approx distinct is using
>>>>> hyperloglog,
>>>>> but there is only an option to return the count as a Long.
>>>>>
>>>>> It can be useful to be able to return and store the actual data
>>>>> structure (ie serialized HLL). This effectively allows one to do
>>>>> aggregation / rollups over columns while still preserving the ability to
>>>>> get distinct counts.
>>>>>
>>>>> For example, one can store daily aggregates of events, grouped by
>>>>> various columns, while storing for each grouping the HLL of say unique
>>>>> users. So you can get the uniques per day directly but could also very
>>>>> easily do arbitrary aggregates (say monthly, annually) and still be able to
>>>>> get a unique count for that period by merging the daily HLLS.
>>>>>
>>>>> I did this a while back as a Hive UDAF (
>>>>> https://github.com/MLnick/hive-udf) which returns a Struct field
>>>>> containing a "cardinality" field and a "binary" field containing the
>>>>> serialized HLL.
>>>>>
>>>>> I was wondering if there would be interest in something like this? I
>>>>> am not so clear on how UDTs work with regards to SerDe - so could one adapt
>>>>> the HyperLogLogUDT to be a Struct with the serialized HLL as a field as
>>>>> well as count as a field? Then I assume this would automatically play
>>>>> nicely with DataFrame I/O etc. The gotcha is one needs to then call
>>>>> "approx_count_field.count" (or is there a concept of a "default field" for
>>>>> a Struct?).
>>>>>
>>>>> Also, being able to provide the bitsize parameter may be useful...
>>>>>
>>>>> The same thinking would apply potentially to other approximate (and
>>>>> mergeable) data structures like T-Digest and maybe CMS.
>>>>>
>>>>> Nick
>>>>>
>>>>
>>>>
>>>
>>
>

Re: HyperLogLogUDT

Posted by Yin Huai <yh...@databricks.com>.
The user implementing a UDAF does not need to consider what is the
underlying buffer. Our aggregate operator will figure out if the buffer
data types of all aggregate functions used by a query are supported by the
UnsafeRow. If so, we will use the UnsafeRow as the buffer.

Regarding the performance, UDAF is not as efficient as out built-in
aggregate functions mainly because (1) users implement UDAFs with JVM data
types not SQL data types (e.g. in a UDAF you will use String not
UTF8String, which is our SQL data type) (2) UDAF does not support
code-generation.

For handling different data types for an argument, having multiple UDAF
classes is the way for now. We will consider what will be the right way to
support specifying multiple possible data types for an argument.

Thanks,

Yin

On Sat, Sep 12, 2015 at 11:01 PM, Nick Pentreath <ni...@gmail.com>
wrote:

> Thanks Yin
>
> So how does one ensure a UDAF works with Tungsten and UnsafeRow buffers?
> Or is this something that will be included in the UDAF interface in future?
>
> Is there a performance difference between Extending UDAF vs Aggregate2?
>
> It's also not clear to me how to handle inputs of different types? What if
> my UDAF can handle String and Long for example? Do I need to specify
> AnyType or is there a way to specify multiple types possible for a single
> input column?
>
> If no performance difference and UDAF can work with Tungsten, then Herman
> does it perhaps make sense to use UDAF (but without a UDT as you've done
> for performance)? As it would then be easy to extend that UDAF and adjust
> the output types as needed. It also provides a really nice example of how
> to use the interface for something advanced and high performance.
>
> —
> Sent from Mailbox <https://www.dropbox.com/mailbox>
>
>
> On Sun, Sep 13, 2015 at 12:09 AM, Yin Huai <yh...@databricks.com> wrote:
>
>> Hi Nick,
>>
>> The buffer exposed to UDAF interface is just a view of underlying buffer
>> (this underlying buffer is shared by different aggregate functions and
>> every function takes one or multiple slots). If you need a UDAF, extending
>> UserDefinedAggregationFunction is the preferred
>> approach. AggregateFunction2 is used for built-in aggregate function.
>>
>> Thanks,
>>
>> Yin
>>
>> On Sat, Sep 12, 2015 at 10:40 AM, Nick Pentreath <
>> nick.pentreath@gmail.com> wrote:
>>
>>> Ok, that makes sense. So this is (a) more efficient, since as far as I
>>> can see it is updating the HLL registers directly in the buffer for each
>>> value, and (b) would be "Tungsten-compatible" as it can work against
>>> UnsafeRow? Is it currently possible to specify an UnsafeRow as a buffer in
>>> a UDAF?
>>>
>>> So is extending AggregateFunction2 the preferred approach over the
>>> UserDefinedAggregationFunction interface? Or it is that internal only?
>>>
>>> I see one of the main use cases for things like HLL / CMS and other
>>> approximate data structure being the fact that you can store them as
>>> columns representing distinct counts in an aggregation. And then do further
>>> arbitrary aggregations on that data as required. e.g. store hourly
>>> aggregate data, and compute daily or monthly aggregates from that, while
>>> still keeping the ability to have distinct counts on certain fields.
>>>
>>> So exposing the serialized HLL as Array[Byte] say, so that it can be
>>> further aggregated in a later DF operation, or saved to an external data
>>> source, would be super useful.
>>>
>>>
>>>
>>> On Sat, Sep 12, 2015 at 6:06 PM, Herman van Hövell tot Westerflier <
>>> hvanhovell@questtec.nl> wrote:
>>>
>>>> I am typically all for code re-use. The reason for writing this is to
>>>> prevent the indirection of a UDT and work directly against memory. A UDT
>>>> will work fine at the moment because we still use
>>>> GenericMutableRow/SpecificMutableRow as aggregation buffers. However if you
>>>> would use an UnsafeRow as an AggregationBuffer (which is attractive when
>>>> you have a lot of groups during aggregation) the use of an UDT is either
>>>> impossible or it would become very slow because it would require us to
>>>> deserialize/serialize a UDT on every update.
>>>>
>>>> As for compatibility, the implementation produces exactly the same
>>>> results as the ClearSpring implementation. You could easily export the
>>>> HLL++ register values to the current ClearSpring implementation and export
>>>> those.
>>>>
>>>> Met vriendelijke groet/Kind regards,
>>>>
>>>> Herman van Hövell tot Westerflier
>>>>
>>>> QuestTec B.V.
>>>> Torenwacht 98
>>>> 2353 DC Leiderdorp
>>>> hvanhovell@questtec.nl
>>>> +599 9 521 4402
>>>>
>>>>
>>>> 2015-09-12 11:06 GMT+02:00 Nick Pentreath <ni...@gmail.com>:
>>>>
>>>>> I should add that surely the idea behind UDT is exactly that it can
>>>>> (a) fit automatically into DFs and Tungsten and (b) that it can be used
>>>>> efficiently in writing ones own UDTs and UDAFs?
>>>>>
>>>>>
>>>>> On Sat, Sep 12, 2015 at 11:05 AM, Nick Pentreath <
>>>>> nick.pentreath@gmail.com> wrote:
>>>>>
>>>>>> Can I ask why you've done this as a custom implementation rather than
>>>>>> using StreamLib, which is already implemented and widely used? It seems
>>>>>> more portable to me to use a library - for example, I'd like to export the
>>>>>> grouped data with raw HLLs to say Elasticsearch, and then do further
>>>>>> on-demand aggregation in ES and visualization in Kibana etc.
>>>>>>
>>>>>> Others may want to do something similar into Hive, Cassandra, HBase
>>>>>> or whatever they are using. In this case they'd need to use this particular
>>>>>> implementation from Spark which may be tricky to include in a dependency
>>>>>> etc.
>>>>>>
>>>>>> If there are enhancements, does it not make sense to do a PR to
>>>>>> StreamLib? Or does this interact in some better way with Tungsten?
>>>>>>
>>>>>> I am unclear on how the interop with Tungsten raw memory works - some
>>>>>> pointers on that and where to look in the Spark code would be helpful.
>>>>>>
>>>>>> On Sat, Sep 12, 2015 at 10:45 AM, Herman van Hövell tot Westerflier <
>>>>>> hvanhovell@questtec.nl> wrote:
>>>>>>
>>>>>>> Hello Nick,
>>>>>>>
>>>>>>> I have been working on a (UDT-less) implementation of HLL++. You can
>>>>>>> find the PR here: https://github.com/apache/spark/pull/8362. This
>>>>>>> current implements the dense version of HLL++, which is a further
>>>>>>> development of HLL. It returns a Long, but it shouldn't be to hard to
>>>>>>> return a Row containing the cardinality and/or the HLL registers (the
>>>>>>> binary data).
>>>>>>>
>>>>>>> I am curious what the stance is on using UDTs in the new UDAF
>>>>>>> interface. Is this still viable? This wouldn't work with UnsafeRow for
>>>>>>> instance. The OpenHashSetUDT for instance would be a nice building block
>>>>>>> for CollectSet and all Distinct Aggregate operators. Are there any opinions
>>>>>>> on this?
>>>>>>>
>>>>>>> Kind regards,
>>>>>>>
>>>>>>> Herman van Hövell tot Westerflier
>>>>>>>
>>>>>>> QuestTec B.V.
>>>>>>> Torenwacht 98
>>>>>>> 2353 DC Leiderdorp
>>>>>>> hvanhovell@questtec.nl
>>>>>>> +599 9 521 4402
>>>>>>>
>>>>>>>
>>>>>>> 2015-09-12 10:07 GMT+02:00 Nick Pentreath <ni...@gmail.com>
>>>>>>> :
>>>>>>>
>>>>>>>> Inspired by this post:
>>>>>>>> http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/,
>>>>>>>> I've started putting together something based on the Spark 1.5 UDAF
>>>>>>>> interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141
>>>>>>>>
>>>>>>>> Some questions -
>>>>>>>>
>>>>>>>> 1. How do I get the UDAF to accept input arguments of different
>>>>>>>> type? We can hash anything basically for HLL - Int, Long, String, Object,
>>>>>>>> raw bytes etc. Right now it seems we'd need to build a new UDAF for each
>>>>>>>> input type, which seems strange - I should be able to use one UDAF that can
>>>>>>>> handle raw input of different types, as well as handle existing HLLs that
>>>>>>>> can be merged/aggregated (e.g. for grouped data)
>>>>>>>> 2. @Reynold, how would I ensure this works for Tungsten (ie against
>>>>>>>> raw bytes in memory)? Or does the new Aggregate2 stuff automatically do
>>>>>>>> that? Where should I look for examples on how this works internally?
>>>>>>>> 3. I've based this on the Sum and Avg examples for the new UDAF
>>>>>>>> interface - any suggestions or issue please advise. Is the intermediate
>>>>>>>> buffer efficient?
>>>>>>>> 4. The current HyperLogLogUDT is private - so I've had to make my
>>>>>>>> own one which is a bit pointless as it's copy-pasted. Any thoughts on
>>>>>>>> exposing that type? Or I need to make the package spark.sql ...
>>>>>>>>
>>>>>>>> Nick
>>>>>>>>
>>>>>>>> On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <rx...@databricks.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Yes - it's very interesting. However, ideally we should have a
>>>>>>>>> version of hyperloglog that can work directly against some raw bytes in
>>>>>>>>> memory (rather than java objects), in order for this to fit the Tungsten
>>>>>>>>> execution model where everything is operating directly against some memory
>>>>>>>>> address.
>>>>>>>>>
>>>>>>>>> On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath <
>>>>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Sure I can copy the code but my aim was more to understand:
>>>>>>>>>>
>>>>>>>>>> (A) if this is broadly interesting enough to folks to think about
>>>>>>>>>> updating / extending the existing UDAF within Spark
>>>>>>>>>> (b) how to register ones own custom UDAF - in which case it could
>>>>>>>>>> be a Spark package for example
>>>>>>>>>>
>>>>>>>>>> All examples deal with registering a UDF but nothing about UDAFs
>>>>>>>>>>
>>>>>>>>>> —
>>>>>>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos <
>>>>>>>>>> daniel.darabos@lynxanalytics.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> It's already possible to just copy the code from
>>>>>>>>>>> countApproxDistinct
>>>>>>>>>>> <https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153> and
>>>>>>>>>>> access the HLL directly, or do anything you like.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath <
>>>>>>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Any thoughts?
>>>>>>>>>>>>
>>>>>>>>>>>> —
>>>>>>>>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath <
>>>>>>>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hey Spark devs
>>>>>>>>>>>>>
>>>>>>>>>>>>> I've been looking at DF UDFs and UDAFs. The approx distinct is
>>>>>>>>>>>>> using hyperloglog,
>>>>>>>>>>>>> but there is only an option to return the count as a Long.
>>>>>>>>>>>>>
>>>>>>>>>>>>> It can be useful to be able to return and store the actual
>>>>>>>>>>>>> data structure (ie serialized HLL). This effectively allows one to do
>>>>>>>>>>>>> aggregation / rollups over columns while still preserving the ability to
>>>>>>>>>>>>> get distinct counts.
>>>>>>>>>>>>>
>>>>>>>>>>>>> For example, one can store daily aggregates of events, grouped
>>>>>>>>>>>>> by various columns, while storing for each grouping the HLL of say unique
>>>>>>>>>>>>> users. So you can get the uniques per day directly but could also very
>>>>>>>>>>>>> easily do arbitrary aggregates (say monthly, annually) and still be able to
>>>>>>>>>>>>> get a unique count for that period by merging the daily HLLS.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I did this a while back as a Hive UDAF (
>>>>>>>>>>>>> https://github.com/MLnick/hive-udf) which returns a Struct
>>>>>>>>>>>>> field containing a "cardinality" field and a "binary" field containing the
>>>>>>>>>>>>> serialized HLL.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I was wondering if there would be interest in something like
>>>>>>>>>>>>> this? I am not so clear on how UDTs work with regards to SerDe - so could
>>>>>>>>>>>>> one adapt the HyperLogLogUDT to be a Struct with the serialized HLL as a
>>>>>>>>>>>>> field as well as count as a field? Then I assume this would automatically
>>>>>>>>>>>>> play nicely with DataFrame I/O etc. The gotcha is one needs to then call
>>>>>>>>>>>>> "approx_count_field.count" (or is there a concept of a "default field" for
>>>>>>>>>>>>> a Struct?).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Also, being able to provide the bitsize parameter may be
>>>>>>>>>>>>> useful...
>>>>>>>>>>>>>
>>>>>>>>>>>>> The same thinking would apply potentially to other approximate
>>>>>>>>>>>>> (and mergeable) data structures like T-Digest and maybe CMS.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: HyperLogLogUDT

Posted by Nick Pentreath <ni...@gmail.com>.
Thanks Yin




So how does one ensure a UDAF works with Tungsten and UnsafeRow buffers? Or is this something that will be included in the UDAF interface in future? 




Is there a performance difference between Extending UDAF vs Aggregate2?




It's also not clear to me how to handle inputs of different types? What if my UDAF can handle String and Long for example? Do I need to specify AnyType or is there a way to specify multiple types possible for a single input column?




If no performance difference and UDAF can work with Tungsten, then Herman does it perhaps make sense to use UDAF (but without a UDT as you've done for performance)? As it would then be easy to extend that UDAF and adjust the output types as needed. It also provides a really nice example of how to use the interface for something advanced and high performance.



—
Sent from Mailbox

On Sun, Sep 13, 2015 at 12:09 AM, Yin Huai <yh...@databricks.com> wrote:

> Hi Nick,
> The buffer exposed to UDAF interface is just a view of underlying buffer
> (this underlying buffer is shared by different aggregate functions and
> every function takes one or multiple slots). If you need a UDAF, extending
> UserDefinedAggregationFunction is the preferred
> approach. AggregateFunction2 is used for built-in aggregate function.
> Thanks,
> Yin
> On Sat, Sep 12, 2015 at 10:40 AM, Nick Pentreath <ni...@gmail.com>
> wrote:
>> Ok, that makes sense. So this is (a) more efficient, since as far as I can
>> see it is updating the HLL registers directly in the buffer for each value,
>> and (b) would be "Tungsten-compatible" as it can work against UnsafeRow? Is
>> it currently possible to specify an UnsafeRow as a buffer in a UDAF?
>>
>> So is extending AggregateFunction2 the preferred approach over the
>> UserDefinedAggregationFunction interface? Or it is that internal only?
>>
>> I see one of the main use cases for things like HLL / CMS and other
>> approximate data structure being the fact that you can store them as
>> columns representing distinct counts in an aggregation. And then do further
>> arbitrary aggregations on that data as required. e.g. store hourly
>> aggregate data, and compute daily or monthly aggregates from that, while
>> still keeping the ability to have distinct counts on certain fields.
>>
>> So exposing the serialized HLL as Array[Byte] say, so that it can be
>> further aggregated in a later DF operation, or saved to an external data
>> source, would be super useful.
>>
>>
>>
>> On Sat, Sep 12, 2015 at 6:06 PM, Herman van Hövell tot Westerflier <
>> hvanhovell@questtec.nl> wrote:
>>
>>> I am typically all for code re-use. The reason for writing this is to
>>> prevent the indirection of a UDT and work directly against memory. A UDT
>>> will work fine at the moment because we still use
>>> GenericMutableRow/SpecificMutableRow as aggregation buffers. However if you
>>> would use an UnsafeRow as an AggregationBuffer (which is attractive when
>>> you have a lot of groups during aggregation) the use of an UDT is either
>>> impossible or it would become very slow because it would require us to
>>> deserialize/serialize a UDT on every update.
>>>
>>> As for compatibility, the implementation produces exactly the same
>>> results as the ClearSpring implementation. You could easily export the
>>> HLL++ register values to the current ClearSpring implementation and export
>>> those.
>>>
>>> Met vriendelijke groet/Kind regards,
>>>
>>> Herman van Hövell tot Westerflier
>>>
>>> QuestTec B.V.
>>> Torenwacht 98
>>> 2353 DC Leiderdorp
>>> hvanhovell@questtec.nl
>>> +599 9 521 4402
>>>
>>>
>>> 2015-09-12 11:06 GMT+02:00 Nick Pentreath <ni...@gmail.com>:
>>>
>>>> I should add that surely the idea behind UDT is exactly that it can (a)
>>>> fit automatically into DFs and Tungsten and (b) that it can be used
>>>> efficiently in writing ones own UDTs and UDAFs?
>>>>
>>>>
>>>> On Sat, Sep 12, 2015 at 11:05 AM, Nick Pentreath <
>>>> nick.pentreath@gmail.com> wrote:
>>>>
>>>>> Can I ask why you've done this as a custom implementation rather than
>>>>> using StreamLib, which is already implemented and widely used? It seems
>>>>> more portable to me to use a library - for example, I'd like to export the
>>>>> grouped data with raw HLLs to say Elasticsearch, and then do further
>>>>> on-demand aggregation in ES and visualization in Kibana etc.
>>>>>
>>>>> Others may want to do something similar into Hive, Cassandra, HBase or
>>>>> whatever they are using. In this case they'd need to use this particular
>>>>> implementation from Spark which may be tricky to include in a dependency
>>>>> etc.
>>>>>
>>>>> If there are enhancements, does it not make sense to do a PR to
>>>>> StreamLib? Or does this interact in some better way with Tungsten?
>>>>>
>>>>> I am unclear on how the interop with Tungsten raw memory works - some
>>>>> pointers on that and where to look in the Spark code would be helpful.
>>>>>
>>>>> On Sat, Sep 12, 2015 at 10:45 AM, Herman van Hövell tot Westerflier <
>>>>> hvanhovell@questtec.nl> wrote:
>>>>>
>>>>>> Hello Nick,
>>>>>>
>>>>>> I have been working on a (UDT-less) implementation of HLL++. You can
>>>>>> find the PR here: https://github.com/apache/spark/pull/8362. This
>>>>>> current implements the dense version of HLL++, which is a further
>>>>>> development of HLL. It returns a Long, but it shouldn't be to hard to
>>>>>> return a Row containing the cardinality and/or the HLL registers (the
>>>>>> binary data).
>>>>>>
>>>>>> I am curious what the stance is on using UDTs in the new UDAF
>>>>>> interface. Is this still viable? This wouldn't work with UnsafeRow for
>>>>>> instance. The OpenHashSetUDT for instance would be a nice building block
>>>>>> for CollectSet and all Distinct Aggregate operators. Are there any opinions
>>>>>> on this?
>>>>>>
>>>>>> Kind regards,
>>>>>>
>>>>>> Herman van Hövell tot Westerflier
>>>>>>
>>>>>> QuestTec B.V.
>>>>>> Torenwacht 98
>>>>>> 2353 DC Leiderdorp
>>>>>> hvanhovell@questtec.nl
>>>>>> +599 9 521 4402
>>>>>>
>>>>>>
>>>>>> 2015-09-12 10:07 GMT+02:00 Nick Pentreath <ni...@gmail.com>:
>>>>>>
>>>>>>> Inspired by this post:
>>>>>>> http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/,
>>>>>>> I've started putting together something based on the Spark 1.5 UDAF
>>>>>>> interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141
>>>>>>>
>>>>>>> Some questions -
>>>>>>>
>>>>>>> 1. How do I get the UDAF to accept input arguments of different type?
>>>>>>> We can hash anything basically for HLL - Int, Long, String, Object, raw
>>>>>>> bytes etc. Right now it seems we'd need to build a new UDAF for each input
>>>>>>> type, which seems strange - I should be able to use one UDAF that can
>>>>>>> handle raw input of different types, as well as handle existing HLLs that
>>>>>>> can be merged/aggregated (e.g. for grouped data)
>>>>>>> 2. @Reynold, how would I ensure this works for Tungsten (ie against
>>>>>>> raw bytes in memory)? Or does the new Aggregate2 stuff automatically do
>>>>>>> that? Where should I look for examples on how this works internally?
>>>>>>> 3. I've based this on the Sum and Avg examples for the new UDAF
>>>>>>> interface - any suggestions or issue please advise. Is the intermediate
>>>>>>> buffer efficient?
>>>>>>> 4. The current HyperLogLogUDT is private - so I've had to make my own
>>>>>>> one which is a bit pointless as it's copy-pasted. Any thoughts on exposing
>>>>>>> that type? Or I need to make the package spark.sql ...
>>>>>>>
>>>>>>> Nick
>>>>>>>
>>>>>>> On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <rx...@databricks.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yes - it's very interesting. However, ideally we should have a
>>>>>>>> version of hyperloglog that can work directly against some raw bytes in
>>>>>>>> memory (rather than java objects), in order for this to fit the Tungsten
>>>>>>>> execution model where everything is operating directly against some memory
>>>>>>>> address.
>>>>>>>>
>>>>>>>> On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath <
>>>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Sure I can copy the code but my aim was more to understand:
>>>>>>>>>
>>>>>>>>> (A) if this is broadly interesting enough to folks to think about
>>>>>>>>> updating / extending the existing UDAF within Spark
>>>>>>>>> (b) how to register ones own custom UDAF - in which case it could
>>>>>>>>> be a Spark package for example
>>>>>>>>>
>>>>>>>>> All examples deal with registering a UDF but nothing about UDAFs
>>>>>>>>>
>>>>>>>>> —
>>>>>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos <
>>>>>>>>> daniel.darabos@lynxanalytics.com> wrote:
>>>>>>>>>
>>>>>>>>>> It's already possible to just copy the code from
>>>>>>>>>> countApproxDistinct
>>>>>>>>>> <https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153> and
>>>>>>>>>> access the HLL directly, or do anything you like.
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath <
>>>>>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Any thoughts?
>>>>>>>>>>>
>>>>>>>>>>> —
>>>>>>>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath <
>>>>>>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey Spark devs
>>>>>>>>>>>>
>>>>>>>>>>>> I've been looking at DF UDFs and UDAFs. The approx distinct is
>>>>>>>>>>>> using hyperloglog,
>>>>>>>>>>>> but there is only an option to return the count as a Long.
>>>>>>>>>>>>
>>>>>>>>>>>> It can be useful to be able to return and store the actual data
>>>>>>>>>>>> structure (ie serialized HLL). This effectively allows one to do
>>>>>>>>>>>> aggregation / rollups over columns while still preserving the ability to
>>>>>>>>>>>> get distinct counts.
>>>>>>>>>>>>
>>>>>>>>>>>> For example, one can store daily aggregates of events, grouped
>>>>>>>>>>>> by various columns, while storing for each grouping the HLL of say unique
>>>>>>>>>>>> users. So you can get the uniques per day directly but could also very
>>>>>>>>>>>> easily do arbitrary aggregates (say monthly, annually) and still be able to
>>>>>>>>>>>> get a unique count for that period by merging the daily HLLS.
>>>>>>>>>>>>
>>>>>>>>>>>> I did this a while back as a Hive UDAF (
>>>>>>>>>>>> https://github.com/MLnick/hive-udf) which returns a Struct
>>>>>>>>>>>> field containing a "cardinality" field and a "binary" field containing the
>>>>>>>>>>>> serialized HLL.
>>>>>>>>>>>>
>>>>>>>>>>>> I was wondering if there would be interest in something like
>>>>>>>>>>>> this? I am not so clear on how UDTs work with regards to SerDe - so could
>>>>>>>>>>>> one adapt the HyperLogLogUDT to be a Struct with the serialized HLL as a
>>>>>>>>>>>> field as well as count as a field? Then I assume this would automatically
>>>>>>>>>>>> play nicely with DataFrame I/O etc. The gotcha is one needs to then call
>>>>>>>>>>>> "approx_count_field.count" (or is there a concept of a "default field" for
>>>>>>>>>>>> a Struct?).
>>>>>>>>>>>>
>>>>>>>>>>>> Also, being able to provide the bitsize parameter may be
>>>>>>>>>>>> useful...
>>>>>>>>>>>>
>>>>>>>>>>>> The same thinking would apply potentially to other approximate
>>>>>>>>>>>> (and mergeable) data structures like T-Digest and maybe CMS.
>>>>>>>>>>>>
>>>>>>>>>>>> Nick
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Re: HyperLogLogUDT

Posted by Yin Huai <yh...@databricks.com>.
Hi Nick,

The buffer exposed to UDAF interface is just a view of underlying buffer
(this underlying buffer is shared by different aggregate functions and
every function takes one or multiple slots). If you need a UDAF, extending
UserDefinedAggregationFunction is the preferred
approach. AggregateFunction2 is used for built-in aggregate function.

Thanks,

Yin

On Sat, Sep 12, 2015 at 10:40 AM, Nick Pentreath <ni...@gmail.com>
wrote:

> Ok, that makes sense. So this is (a) more efficient, since as far as I can
> see it is updating the HLL registers directly in the buffer for each value,
> and (b) would be "Tungsten-compatible" as it can work against UnsafeRow? Is
> it currently possible to specify an UnsafeRow as a buffer in a UDAF?
>
> So is extending AggregateFunction2 the preferred approach over the
> UserDefinedAggregationFunction interface? Or it is that internal only?
>
> I see one of the main use cases for things like HLL / CMS and other
> approximate data structure being the fact that you can store them as
> columns representing distinct counts in an aggregation. And then do further
> arbitrary aggregations on that data as required. e.g. store hourly
> aggregate data, and compute daily or monthly aggregates from that, while
> still keeping the ability to have distinct counts on certain fields.
>
> So exposing the serialized HLL as Array[Byte] say, so that it can be
> further aggregated in a later DF operation, or saved to an external data
> source, would be super useful.
>
>
>
> On Sat, Sep 12, 2015 at 6:06 PM, Herman van Hövell tot Westerflier <
> hvanhovell@questtec.nl> wrote:
>
>> I am typically all for code re-use. The reason for writing this is to
>> prevent the indirection of a UDT and work directly against memory. A UDT
>> will work fine at the moment because we still use
>> GenericMutableRow/SpecificMutableRow as aggregation buffers. However if you
>> would use an UnsafeRow as an AggregationBuffer (which is attractive when
>> you have a lot of groups during aggregation) the use of an UDT is either
>> impossible or it would become very slow because it would require us to
>> deserialize/serialize a UDT on every update.
>>
>> As for compatibility, the implementation produces exactly the same
>> results as the ClearSpring implementation. You could easily export the
>> HLL++ register values to the current ClearSpring implementation and export
>> those.
>>
>> Met vriendelijke groet/Kind regards,
>>
>> Herman van Hövell tot Westerflier
>>
>> QuestTec B.V.
>> Torenwacht 98
>> 2353 DC Leiderdorp
>> hvanhovell@questtec.nl
>> +599 9 521 4402
>>
>>
>> 2015-09-12 11:06 GMT+02:00 Nick Pentreath <ni...@gmail.com>:
>>
>>> I should add that surely the idea behind UDT is exactly that it can (a)
>>> fit automatically into DFs and Tungsten and (b) that it can be used
>>> efficiently in writing ones own UDTs and UDAFs?
>>>
>>>
>>> On Sat, Sep 12, 2015 at 11:05 AM, Nick Pentreath <
>>> nick.pentreath@gmail.com> wrote:
>>>
>>>> Can I ask why you've done this as a custom implementation rather than
>>>> using StreamLib, which is already implemented and widely used? It seems
>>>> more portable to me to use a library - for example, I'd like to export the
>>>> grouped data with raw HLLs to say Elasticsearch, and then do further
>>>> on-demand aggregation in ES and visualization in Kibana etc.
>>>>
>>>> Others may want to do something similar into Hive, Cassandra, HBase or
>>>> whatever they are using. In this case they'd need to use this particular
>>>> implementation from Spark which may be tricky to include in a dependency
>>>> etc.
>>>>
>>>> If there are enhancements, does it not make sense to do a PR to
>>>> StreamLib? Or does this interact in some better way with Tungsten?
>>>>
>>>> I am unclear on how the interop with Tungsten raw memory works - some
>>>> pointers on that and where to look in the Spark code would be helpful.
>>>>
>>>> On Sat, Sep 12, 2015 at 10:45 AM, Herman van Hövell tot Westerflier <
>>>> hvanhovell@questtec.nl> wrote:
>>>>
>>>>> Hello Nick,
>>>>>
>>>>> I have been working on a (UDT-less) implementation of HLL++. You can
>>>>> find the PR here: https://github.com/apache/spark/pull/8362. This
>>>>> current implements the dense version of HLL++, which is a further
>>>>> development of HLL. It returns a Long, but it shouldn't be to hard to
>>>>> return a Row containing the cardinality and/or the HLL registers (the
>>>>> binary data).
>>>>>
>>>>> I am curious what the stance is on using UDTs in the new UDAF
>>>>> interface. Is this still viable? This wouldn't work with UnsafeRow for
>>>>> instance. The OpenHashSetUDT for instance would be a nice building block
>>>>> for CollectSet and all Distinct Aggregate operators. Are there any opinions
>>>>> on this?
>>>>>
>>>>> Kind regards,
>>>>>
>>>>> Herman van Hövell tot Westerflier
>>>>>
>>>>> QuestTec B.V.
>>>>> Torenwacht 98
>>>>> 2353 DC Leiderdorp
>>>>> hvanhovell@questtec.nl
>>>>> +599 9 521 4402
>>>>>
>>>>>
>>>>> 2015-09-12 10:07 GMT+02:00 Nick Pentreath <ni...@gmail.com>:
>>>>>
>>>>>> Inspired by this post:
>>>>>> http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/,
>>>>>> I've started putting together something based on the Spark 1.5 UDAF
>>>>>> interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141
>>>>>>
>>>>>> Some questions -
>>>>>>
>>>>>> 1. How do I get the UDAF to accept input arguments of different type?
>>>>>> We can hash anything basically for HLL - Int, Long, String, Object, raw
>>>>>> bytes etc. Right now it seems we'd need to build a new UDAF for each input
>>>>>> type, which seems strange - I should be able to use one UDAF that can
>>>>>> handle raw input of different types, as well as handle existing HLLs that
>>>>>> can be merged/aggregated (e.g. for grouped data)
>>>>>> 2. @Reynold, how would I ensure this works for Tungsten (ie against
>>>>>> raw bytes in memory)? Or does the new Aggregate2 stuff automatically do
>>>>>> that? Where should I look for examples on how this works internally?
>>>>>> 3. I've based this on the Sum and Avg examples for the new UDAF
>>>>>> interface - any suggestions or issue please advise. Is the intermediate
>>>>>> buffer efficient?
>>>>>> 4. The current HyperLogLogUDT is private - so I've had to make my own
>>>>>> one which is a bit pointless as it's copy-pasted. Any thoughts on exposing
>>>>>> that type? Or I need to make the package spark.sql ...
>>>>>>
>>>>>> Nick
>>>>>>
>>>>>> On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <rx...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes - it's very interesting. However, ideally we should have a
>>>>>>> version of hyperloglog that can work directly against some raw bytes in
>>>>>>> memory (rather than java objects), in order for this to fit the Tungsten
>>>>>>> execution model where everything is operating directly against some memory
>>>>>>> address.
>>>>>>>
>>>>>>> On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath <
>>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>>
>>>>>>>> Sure I can copy the code but my aim was more to understand:
>>>>>>>>
>>>>>>>> (A) if this is broadly interesting enough to folks to think about
>>>>>>>> updating / extending the existing UDAF within Spark
>>>>>>>> (b) how to register ones own custom UDAF - in which case it could
>>>>>>>> be a Spark package for example
>>>>>>>>
>>>>>>>> All examples deal with registering a UDF but nothing about UDAFs
>>>>>>>>
>>>>>>>> —
>>>>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos <
>>>>>>>> daniel.darabos@lynxanalytics.com> wrote:
>>>>>>>>
>>>>>>>>> It's already possible to just copy the code from
>>>>>>>>> countApproxDistinct
>>>>>>>>> <https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153> and
>>>>>>>>> access the HLL directly, or do anything you like.
>>>>>>>>>
>>>>>>>>> On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath <
>>>>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Any thoughts?
>>>>>>>>>>
>>>>>>>>>> —
>>>>>>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath <
>>>>>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hey Spark devs
>>>>>>>>>>>
>>>>>>>>>>> I've been looking at DF UDFs and UDAFs. The approx distinct is
>>>>>>>>>>> using hyperloglog,
>>>>>>>>>>> but there is only an option to return the count as a Long.
>>>>>>>>>>>
>>>>>>>>>>> It can be useful to be able to return and store the actual data
>>>>>>>>>>> structure (ie serialized HLL). This effectively allows one to do
>>>>>>>>>>> aggregation / rollups over columns while still preserving the ability to
>>>>>>>>>>> get distinct counts.
>>>>>>>>>>>
>>>>>>>>>>> For example, one can store daily aggregates of events, grouped
>>>>>>>>>>> by various columns, while storing for each grouping the HLL of say unique
>>>>>>>>>>> users. So you can get the uniques per day directly but could also very
>>>>>>>>>>> easily do arbitrary aggregates (say monthly, annually) and still be able to
>>>>>>>>>>> get a unique count for that period by merging the daily HLLS.
>>>>>>>>>>>
>>>>>>>>>>> I did this a while back as a Hive UDAF (
>>>>>>>>>>> https://github.com/MLnick/hive-udf) which returns a Struct
>>>>>>>>>>> field containing a "cardinality" field and a "binary" field containing the
>>>>>>>>>>> serialized HLL.
>>>>>>>>>>>
>>>>>>>>>>> I was wondering if there would be interest in something like
>>>>>>>>>>> this? I am not so clear on how UDTs work with regards to SerDe - so could
>>>>>>>>>>> one adapt the HyperLogLogUDT to be a Struct with the serialized HLL as a
>>>>>>>>>>> field as well as count as a field? Then I assume this would automatically
>>>>>>>>>>> play nicely with DataFrame I/O etc. The gotcha is one needs to then call
>>>>>>>>>>> "approx_count_field.count" (or is there a concept of a "default field" for
>>>>>>>>>>> a Struct?).
>>>>>>>>>>>
>>>>>>>>>>> Also, being able to provide the bitsize parameter may be
>>>>>>>>>>> useful...
>>>>>>>>>>>
>>>>>>>>>>> The same thinking would apply potentially to other approximate
>>>>>>>>>>> (and mergeable) data structures like T-Digest and maybe CMS.
>>>>>>>>>>>
>>>>>>>>>>> Nick
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: HyperLogLogUDT

Posted by Nick Pentreath <ni...@gmail.com>.
Ok, that makes sense. So this is (a) more efficient, since as far as I can
see it is updating the HLL registers directly in the buffer for each value,
and (b) would be "Tungsten-compatible" as it can work against UnsafeRow? Is
it currently possible to specify an UnsafeRow as a buffer in a UDAF?

So is extending AggregateFunction2 the preferred approach over the
UserDefinedAggregationFunction interface? Or it is that internal only?

I see one of the main use cases for things like HLL / CMS and other
approximate data structure being the fact that you can store them as
columns representing distinct counts in an aggregation. And then do further
arbitrary aggregations on that data as required. e.g. store hourly
aggregate data, and compute daily or monthly aggregates from that, while
still keeping the ability to have distinct counts on certain fields.

So exposing the serialized HLL as Array[Byte] say, so that it can be
further aggregated in a later DF operation, or saved to an external data
source, would be super useful.



On Sat, Sep 12, 2015 at 6:06 PM, Herman van Hövell tot Westerflier <
hvanhovell@questtec.nl> wrote:

> I am typically all for code re-use. The reason for writing this is to
> prevent the indirection of a UDT and work directly against memory. A UDT
> will work fine at the moment because we still use
> GenericMutableRow/SpecificMutableRow as aggregation buffers. However if you
> would use an UnsafeRow as an AggregationBuffer (which is attractive when
> you have a lot of groups during aggregation) the use of an UDT is either
> impossible or it would become very slow because it would require us to
> deserialize/serialize a UDT on every update.
>
> As for compatibility, the implementation produces exactly the same results
> as the ClearSpring implementation. You could easily export the HLL++
> register values to the current ClearSpring implementation and export those.
>
> Met vriendelijke groet/Kind regards,
>
> Herman van Hövell tot Westerflier
>
> QuestTec B.V.
> Torenwacht 98
> 2353 DC Leiderdorp
> hvanhovell@questtec.nl
> +599 9 521 4402
>
>
> 2015-09-12 11:06 GMT+02:00 Nick Pentreath <ni...@gmail.com>:
>
>> I should add that surely the idea behind UDT is exactly that it can (a)
>> fit automatically into DFs and Tungsten and (b) that it can be used
>> efficiently in writing ones own UDTs and UDAFs?
>>
>>
>> On Sat, Sep 12, 2015 at 11:05 AM, Nick Pentreath <
>> nick.pentreath@gmail.com> wrote:
>>
>>> Can I ask why you've done this as a custom implementation rather than
>>> using StreamLib, which is already implemented and widely used? It seems
>>> more portable to me to use a library - for example, I'd like to export the
>>> grouped data with raw HLLs to say Elasticsearch, and then do further
>>> on-demand aggregation in ES and visualization in Kibana etc.
>>>
>>> Others may want to do something similar into Hive, Cassandra, HBase or
>>> whatever they are using. In this case they'd need to use this particular
>>> implementation from Spark which may be tricky to include in a dependency
>>> etc.
>>>
>>> If there are enhancements, does it not make sense to do a PR to
>>> StreamLib? Or does this interact in some better way with Tungsten?
>>>
>>> I am unclear on how the interop with Tungsten raw memory works - some
>>> pointers on that and where to look in the Spark code would be helpful.
>>>
>>> On Sat, Sep 12, 2015 at 10:45 AM, Herman van Hövell tot Westerflier <
>>> hvanhovell@questtec.nl> wrote:
>>>
>>>> Hello Nick,
>>>>
>>>> I have been working on a (UDT-less) implementation of HLL++. You can
>>>> find the PR here: https://github.com/apache/spark/pull/8362. This
>>>> current implements the dense version of HLL++, which is a further
>>>> development of HLL. It returns a Long, but it shouldn't be to hard to
>>>> return a Row containing the cardinality and/or the HLL registers (the
>>>> binary data).
>>>>
>>>> I am curious what the stance is on using UDTs in the new UDAF
>>>> interface. Is this still viable? This wouldn't work with UnsafeRow for
>>>> instance. The OpenHashSetUDT for instance would be a nice building block
>>>> for CollectSet and all Distinct Aggregate operators. Are there any opinions
>>>> on this?
>>>>
>>>> Kind regards,
>>>>
>>>> Herman van Hövell tot Westerflier
>>>>
>>>> QuestTec B.V.
>>>> Torenwacht 98
>>>> 2353 DC Leiderdorp
>>>> hvanhovell@questtec.nl
>>>> +599 9 521 4402
>>>>
>>>>
>>>> 2015-09-12 10:07 GMT+02:00 Nick Pentreath <ni...@gmail.com>:
>>>>
>>>>> Inspired by this post:
>>>>> http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/,
>>>>> I've started putting together something based on the Spark 1.5 UDAF
>>>>> interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141
>>>>>
>>>>> Some questions -
>>>>>
>>>>> 1. How do I get the UDAF to accept input arguments of different type?
>>>>> We can hash anything basically for HLL - Int, Long, String, Object, raw
>>>>> bytes etc. Right now it seems we'd need to build a new UDAF for each input
>>>>> type, which seems strange - I should be able to use one UDAF that can
>>>>> handle raw input of different types, as well as handle existing HLLs that
>>>>> can be merged/aggregated (e.g. for grouped data)
>>>>> 2. @Reynold, how would I ensure this works for Tungsten (ie against
>>>>> raw bytes in memory)? Or does the new Aggregate2 stuff automatically do
>>>>> that? Where should I look for examples on how this works internally?
>>>>> 3. I've based this on the Sum and Avg examples for the new UDAF
>>>>> interface - any suggestions or issue please advise. Is the intermediate
>>>>> buffer efficient?
>>>>> 4. The current HyperLogLogUDT is private - so I've had to make my own
>>>>> one which is a bit pointless as it's copy-pasted. Any thoughts on exposing
>>>>> that type? Or I need to make the package spark.sql ...
>>>>>
>>>>> Nick
>>>>>
>>>>> On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <rx...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> Yes - it's very interesting. However, ideally we should have a
>>>>>> version of hyperloglog that can work directly against some raw bytes in
>>>>>> memory (rather than java objects), in order for this to fit the Tungsten
>>>>>> execution model where everything is operating directly against some memory
>>>>>> address.
>>>>>>
>>>>>> On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath <
>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>
>>>>>>> Sure I can copy the code but my aim was more to understand:
>>>>>>>
>>>>>>> (A) if this is broadly interesting enough to folks to think about
>>>>>>> updating / extending the existing UDAF within Spark
>>>>>>> (b) how to register ones own custom UDAF - in which case it could be
>>>>>>> a Spark package for example
>>>>>>>
>>>>>>> All examples deal with registering a UDF but nothing about UDAFs
>>>>>>>
>>>>>>> —
>>>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos <
>>>>>>> daniel.darabos@lynxanalytics.com> wrote:
>>>>>>>
>>>>>>>> It's already possible to just copy the code from
>>>>>>>> countApproxDistinct
>>>>>>>> <https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153> and
>>>>>>>> access the HLL directly, or do anything you like.
>>>>>>>>
>>>>>>>> On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath <
>>>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Any thoughts?
>>>>>>>>>
>>>>>>>>> —
>>>>>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath <
>>>>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hey Spark devs
>>>>>>>>>>
>>>>>>>>>> I've been looking at DF UDFs and UDAFs. The approx distinct is
>>>>>>>>>> using hyperloglog,
>>>>>>>>>> but there is only an option to return the count as a Long.
>>>>>>>>>>
>>>>>>>>>> It can be useful to be able to return and store the actual data
>>>>>>>>>> structure (ie serialized HLL). This effectively allows one to do
>>>>>>>>>> aggregation / rollups over columns while still preserving the ability to
>>>>>>>>>> get distinct counts.
>>>>>>>>>>
>>>>>>>>>> For example, one can store daily aggregates of events, grouped by
>>>>>>>>>> various columns, while storing for each grouping the HLL of say unique
>>>>>>>>>> users. So you can get the uniques per day directly but could also very
>>>>>>>>>> easily do arbitrary aggregates (say monthly, annually) and still be able to
>>>>>>>>>> get a unique count for that period by merging the daily HLLS.
>>>>>>>>>>
>>>>>>>>>> I did this a while back as a Hive UDAF (
>>>>>>>>>> https://github.com/MLnick/hive-udf) which returns a Struct field
>>>>>>>>>> containing a "cardinality" field and a "binary" field containing the
>>>>>>>>>> serialized HLL.
>>>>>>>>>>
>>>>>>>>>> I was wondering if there would be interest in something like
>>>>>>>>>> this? I am not so clear on how UDTs work with regards to SerDe - so could
>>>>>>>>>> one adapt the HyperLogLogUDT to be a Struct with the serialized HLL as a
>>>>>>>>>> field as well as count as a field? Then I assume this would automatically
>>>>>>>>>> play nicely with DataFrame I/O etc. The gotcha is one needs to then call
>>>>>>>>>> "approx_count_field.count" (or is there a concept of a "default field" for
>>>>>>>>>> a Struct?).
>>>>>>>>>>
>>>>>>>>>> Also, being able to provide the bitsize parameter may be useful...
>>>>>>>>>>
>>>>>>>>>> The same thinking would apply potentially to other approximate
>>>>>>>>>> (and mergeable) data structures like T-Digest and maybe CMS.
>>>>>>>>>>
>>>>>>>>>> Nick
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: HyperLogLogUDT

Posted by Herman van Hövell tot Westerflier <hv...@questtec.nl>.
I am typically all for code re-use. The reason for writing this is to
prevent the indirection of a UDT and work directly against memory. A UDT
will work fine at the moment because we still use
GenericMutableRow/SpecificMutableRow as aggregation buffers. However if you
would use an UnsafeRow as an AggregationBuffer (which is attractive when
you have a lot of groups during aggregation) the use of an UDT is either
impossible or it would become very slow because it would require us to
deserialize/serialize a UDT on every update.

As for compatibility, the implementation produces exactly the same results
as the ClearSpring implementation. You could easily export the HLL++
register values to the current ClearSpring implementation and export those.

Met vriendelijke groet/Kind regards,

Herman van Hövell tot Westerflier

QuestTec B.V.
Torenwacht 98
2353 DC Leiderdorp
hvanhovell@questtec.nl
+599 9 521 4402


2015-09-12 11:06 GMT+02:00 Nick Pentreath <ni...@gmail.com>:

> I should add that surely the idea behind UDT is exactly that it can (a)
> fit automatically into DFs and Tungsten and (b) that it can be used
> efficiently in writing ones own UDTs and UDAFs?
>
>
> On Sat, Sep 12, 2015 at 11:05 AM, Nick Pentreath <nick.pentreath@gmail.com
> > wrote:
>
>> Can I ask why you've done this as a custom implementation rather than
>> using StreamLib, which is already implemented and widely used? It seems
>> more portable to me to use a library - for example, I'd like to export the
>> grouped data with raw HLLs to say Elasticsearch, and then do further
>> on-demand aggregation in ES and visualization in Kibana etc.
>>
>> Others may want to do something similar into Hive, Cassandra, HBase or
>> whatever they are using. In this case they'd need to use this particular
>> implementation from Spark which may be tricky to include in a dependency
>> etc.
>>
>> If there are enhancements, does it not make sense to do a PR to
>> StreamLib? Or does this interact in some better way with Tungsten?
>>
>> I am unclear on how the interop with Tungsten raw memory works - some
>> pointers on that and where to look in the Spark code would be helpful.
>>
>> On Sat, Sep 12, 2015 at 10:45 AM, Herman van Hövell tot Westerflier <
>> hvanhovell@questtec.nl> wrote:
>>
>>> Hello Nick,
>>>
>>> I have been working on a (UDT-less) implementation of HLL++. You can
>>> find the PR here: https://github.com/apache/spark/pull/8362. This
>>> current implements the dense version of HLL++, which is a further
>>> development of HLL. It returns a Long, but it shouldn't be to hard to
>>> return a Row containing the cardinality and/or the HLL registers (the
>>> binary data).
>>>
>>> I am curious what the stance is on using UDTs in the new UDAF interface.
>>> Is this still viable? This wouldn't work with UnsafeRow for instance. The
>>> OpenHashSetUDT for instance would be a nice building block for CollectSet
>>> and all Distinct Aggregate operators. Are there any opinions on this?
>>>
>>> Kind regards,
>>>
>>> Herman van Hövell tot Westerflier
>>>
>>> QuestTec B.V.
>>> Torenwacht 98
>>> 2353 DC Leiderdorp
>>> hvanhovell@questtec.nl
>>> +599 9 521 4402
>>>
>>>
>>> 2015-09-12 10:07 GMT+02:00 Nick Pentreath <ni...@gmail.com>:
>>>
>>>> Inspired by this post:
>>>> http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/,
>>>> I've started putting together something based on the Spark 1.5 UDAF
>>>> interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141
>>>>
>>>> Some questions -
>>>>
>>>> 1. How do I get the UDAF to accept input arguments of different type?
>>>> We can hash anything basically for HLL - Int, Long, String, Object, raw
>>>> bytes etc. Right now it seems we'd need to build a new UDAF for each input
>>>> type, which seems strange - I should be able to use one UDAF that can
>>>> handle raw input of different types, as well as handle existing HLLs that
>>>> can be merged/aggregated (e.g. for grouped data)
>>>> 2. @Reynold, how would I ensure this works for Tungsten (ie against raw
>>>> bytes in memory)? Or does the new Aggregate2 stuff automatically do that?
>>>> Where should I look for examples on how this works internally?
>>>> 3. I've based this on the Sum and Avg examples for the new UDAF
>>>> interface - any suggestions or issue please advise. Is the intermediate
>>>> buffer efficient?
>>>> 4. The current HyperLogLogUDT is private - so I've had to make my own
>>>> one which is a bit pointless as it's copy-pasted. Any thoughts on exposing
>>>> that type? Or I need to make the package spark.sql ...
>>>>
>>>> Nick
>>>>
>>>> On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <rx...@databricks.com>
>>>> wrote:
>>>>
>>>>> Yes - it's very interesting. However, ideally we should have a version
>>>>> of hyperloglog that can work directly against some raw bytes in memory
>>>>> (rather than java objects), in order for this to fit the Tungsten execution
>>>>> model where everything is operating directly against some memory address.
>>>>>
>>>>> On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath <
>>>>> nick.pentreath@gmail.com> wrote:
>>>>>
>>>>>> Sure I can copy the code but my aim was more to understand:
>>>>>>
>>>>>> (A) if this is broadly interesting enough to folks to think about
>>>>>> updating / extending the existing UDAF within Spark
>>>>>> (b) how to register ones own custom UDAF - in which case it could be
>>>>>> a Spark package for example
>>>>>>
>>>>>> All examples deal with registering a UDF but nothing about UDAFs
>>>>>>
>>>>>> —
>>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos <
>>>>>> daniel.darabos@lynxanalytics.com> wrote:
>>>>>>
>>>>>>> It's already possible to just copy the code from countApproxDistinct
>>>>>>> <https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153> and
>>>>>>> access the HLL directly, or do anything you like.
>>>>>>>
>>>>>>> On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath <
>>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>>
>>>>>>>> Any thoughts?
>>>>>>>>
>>>>>>>> —
>>>>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath <
>>>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hey Spark devs
>>>>>>>>>
>>>>>>>>> I've been looking at DF UDFs and UDAFs. The approx distinct is
>>>>>>>>> using hyperloglog,
>>>>>>>>> but there is only an option to return the count as a Long.
>>>>>>>>>
>>>>>>>>> It can be useful to be able to return and store the actual data
>>>>>>>>> structure (ie serialized HLL). This effectively allows one to do
>>>>>>>>> aggregation / rollups over columns while still preserving the ability to
>>>>>>>>> get distinct counts.
>>>>>>>>>
>>>>>>>>> For example, one can store daily aggregates of events, grouped by
>>>>>>>>> various columns, while storing for each grouping the HLL of say unique
>>>>>>>>> users. So you can get the uniques per day directly but could also very
>>>>>>>>> easily do arbitrary aggregates (say monthly, annually) and still be able to
>>>>>>>>> get a unique count for that period by merging the daily HLLS.
>>>>>>>>>
>>>>>>>>> I did this a while back as a Hive UDAF (
>>>>>>>>> https://github.com/MLnick/hive-udf) which returns a Struct field
>>>>>>>>> containing a "cardinality" field and a "binary" field containing the
>>>>>>>>> serialized HLL.
>>>>>>>>>
>>>>>>>>> I was wondering if there would be interest in something like this?
>>>>>>>>> I am not so clear on how UDTs work with regards to SerDe - so could one
>>>>>>>>> adapt the HyperLogLogUDT to be a Struct with the serialized HLL as a field
>>>>>>>>> as well as count as a field? Then I assume this would automatically play
>>>>>>>>> nicely with DataFrame I/O etc. The gotcha is one needs to then call
>>>>>>>>> "approx_count_field.count" (or is there a concept of a "default field" for
>>>>>>>>> a Struct?).
>>>>>>>>>
>>>>>>>>> Also, being able to provide the bitsize parameter may be useful...
>>>>>>>>>
>>>>>>>>> The same thinking would apply potentially to other approximate
>>>>>>>>> (and mergeable) data structures like T-Digest and maybe CMS.
>>>>>>>>>
>>>>>>>>> Nick
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: HyperLogLogUDT

Posted by Nick Pentreath <ni...@gmail.com>.
I should add that surely the idea behind UDT is exactly that it can (a) fit
automatically into DFs and Tungsten and (b) that it can be used efficiently
in writing ones own UDTs and UDAFs?


On Sat, Sep 12, 2015 at 11:05 AM, Nick Pentreath <ni...@gmail.com>
wrote:

> Can I ask why you've done this as a custom implementation rather than
> using StreamLib, which is already implemented and widely used? It seems
> more portable to me to use a library - for example, I'd like to export the
> grouped data with raw HLLs to say Elasticsearch, and then do further
> on-demand aggregation in ES and visualization in Kibana etc.
>
> Others may want to do something similar into Hive, Cassandra, HBase or
> whatever they are using. In this case they'd need to use this particular
> implementation from Spark which may be tricky to include in a dependency
> etc.
>
> If there are enhancements, does it not make sense to do a PR to StreamLib?
> Or does this interact in some better way with Tungsten?
>
> I am unclear on how the interop with Tungsten raw memory works - some
> pointers on that and where to look in the Spark code would be helpful.
>
> On Sat, Sep 12, 2015 at 10:45 AM, Herman van Hövell tot Westerflier <
> hvanhovell@questtec.nl> wrote:
>
>> Hello Nick,
>>
>> I have been working on a (UDT-less) implementation of HLL++. You can find
>> the PR here: https://github.com/apache/spark/pull/8362. This current
>> implements the dense version of HLL++, which is a further development of
>> HLL. It returns a Long, but it shouldn't be to hard to return a Row
>> containing the cardinality and/or the HLL registers (the binary data).
>>
>> I am curious what the stance is on using UDTs in the new UDAF interface.
>> Is this still viable? This wouldn't work with UnsafeRow for instance. The
>> OpenHashSetUDT for instance would be a nice building block for CollectSet
>> and all Distinct Aggregate operators. Are there any opinions on this?
>>
>> Kind regards,
>>
>> Herman van Hövell tot Westerflier
>>
>> QuestTec B.V.
>> Torenwacht 98
>> 2353 DC Leiderdorp
>> hvanhovell@questtec.nl
>> +599 9 521 4402
>>
>>
>> 2015-09-12 10:07 GMT+02:00 Nick Pentreath <ni...@gmail.com>:
>>
>>> Inspired by this post:
>>> http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/,
>>> I've started putting together something based on the Spark 1.5 UDAF
>>> interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141
>>>
>>> Some questions -
>>>
>>> 1. How do I get the UDAF to accept input arguments of different type? We
>>> can hash anything basically for HLL - Int, Long, String, Object, raw bytes
>>> etc. Right now it seems we'd need to build a new UDAF for each input type,
>>> which seems strange - I should be able to use one UDAF that can handle raw
>>> input of different types, as well as handle existing HLLs that can be
>>> merged/aggregated (e.g. for grouped data)
>>> 2. @Reynold, how would I ensure this works for Tungsten (ie against raw
>>> bytes in memory)? Or does the new Aggregate2 stuff automatically do that?
>>> Where should I look for examples on how this works internally?
>>> 3. I've based this on the Sum and Avg examples for the new UDAF
>>> interface - any suggestions or issue please advise. Is the intermediate
>>> buffer efficient?
>>> 4. The current HyperLogLogUDT is private - so I've had to make my own
>>> one which is a bit pointless as it's copy-pasted. Any thoughts on exposing
>>> that type? Or I need to make the package spark.sql ...
>>>
>>> Nick
>>>
>>> On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <rx...@databricks.com> wrote:
>>>
>>>> Yes - it's very interesting. However, ideally we should have a version
>>>> of hyperloglog that can work directly against some raw bytes in memory
>>>> (rather than java objects), in order for this to fit the Tungsten execution
>>>> model where everything is operating directly against some memory address.
>>>>
>>>> On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath <
>>>> nick.pentreath@gmail.com> wrote:
>>>>
>>>>> Sure I can copy the code but my aim was more to understand:
>>>>>
>>>>> (A) if this is broadly interesting enough to folks to think about
>>>>> updating / extending the existing UDAF within Spark
>>>>> (b) how to register ones own custom UDAF - in which case it could be a
>>>>> Spark package for example
>>>>>
>>>>> All examples deal with registering a UDF but nothing about UDAFs
>>>>>
>>>>> —
>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>>
>>>>>
>>>>> On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos <
>>>>> daniel.darabos@lynxanalytics.com> wrote:
>>>>>
>>>>>> It's already possible to just copy the code from countApproxDistinct
>>>>>> <https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153> and
>>>>>> access the HLL directly, or do anything you like.
>>>>>>
>>>>>> On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath <
>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>
>>>>>>> Any thoughts?
>>>>>>>
>>>>>>> —
>>>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath <
>>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hey Spark devs
>>>>>>>>
>>>>>>>> I've been looking at DF UDFs and UDAFs. The approx distinct is
>>>>>>>> using hyperloglog,
>>>>>>>> but there is only an option to return the count as a Long.
>>>>>>>>
>>>>>>>> It can be useful to be able to return and store the actual data
>>>>>>>> structure (ie serialized HLL). This effectively allows one to do
>>>>>>>> aggregation / rollups over columns while still preserving the ability to
>>>>>>>> get distinct counts.
>>>>>>>>
>>>>>>>> For example, one can store daily aggregates of events, grouped by
>>>>>>>> various columns, while storing for each grouping the HLL of say unique
>>>>>>>> users. So you can get the uniques per day directly but could also very
>>>>>>>> easily do arbitrary aggregates (say monthly, annually) and still be able to
>>>>>>>> get a unique count for that period by merging the daily HLLS.
>>>>>>>>
>>>>>>>> I did this a while back as a Hive UDAF (
>>>>>>>> https://github.com/MLnick/hive-udf) which returns a Struct field
>>>>>>>> containing a "cardinality" field and a "binary" field containing the
>>>>>>>> serialized HLL.
>>>>>>>>
>>>>>>>> I was wondering if there would be interest in something like this?
>>>>>>>> I am not so clear on how UDTs work with regards to SerDe - so could one
>>>>>>>> adapt the HyperLogLogUDT to be a Struct with the serialized HLL as a field
>>>>>>>> as well as count as a field? Then I assume this would automatically play
>>>>>>>> nicely with DataFrame I/O etc. The gotcha is one needs to then call
>>>>>>>> "approx_count_field.count" (or is there a concept of a "default field" for
>>>>>>>> a Struct?).
>>>>>>>>
>>>>>>>> Also, being able to provide the bitsize parameter may be useful...
>>>>>>>>
>>>>>>>> The same thinking would apply potentially to other approximate (and
>>>>>>>> mergeable) data structures like T-Digest and maybe CMS.
>>>>>>>>
>>>>>>>> Nick
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: HyperLogLogUDT

Posted by Nick Pentreath <ni...@gmail.com>.
Can I ask why you've done this as a custom implementation rather than using
StreamLib, which is already implemented and widely used? It seems more
portable to me to use a library - for example, I'd like to export the
grouped data with raw HLLs to say Elasticsearch, and then do further
on-demand aggregation in ES and visualization in Kibana etc.

Others may want to do something similar into Hive, Cassandra, HBase or
whatever they are using. In this case they'd need to use this particular
implementation from Spark which may be tricky to include in a dependency
etc.

If there are enhancements, does it not make sense to do a PR to StreamLib?
Or does this interact in some better way with Tungsten?

I am unclear on how the interop with Tungsten raw memory works - some
pointers on that and where to look in the Spark code would be helpful.

On Sat, Sep 12, 2015 at 10:45 AM, Herman van Hövell tot Westerflier <
hvanhovell@questtec.nl> wrote:

> Hello Nick,
>
> I have been working on a (UDT-less) implementation of HLL++. You can find
> the PR here: https://github.com/apache/spark/pull/8362. This current
> implements the dense version of HLL++, which is a further development of
> HLL. It returns a Long, but it shouldn't be to hard to return a Row
> containing the cardinality and/or the HLL registers (the binary data).
>
> I am curious what the stance is on using UDTs in the new UDAF interface.
> Is this still viable? This wouldn't work with UnsafeRow for instance. The
> OpenHashSetUDT for instance would be a nice building block for CollectSet
> and all Distinct Aggregate operators. Are there any opinions on this?
>
> Kind regards,
>
> Herman van Hövell tot Westerflier
>
> QuestTec B.V.
> Torenwacht 98
> 2353 DC Leiderdorp
> hvanhovell@questtec.nl
> +599 9 521 4402
>
>
> 2015-09-12 10:07 GMT+02:00 Nick Pentreath <ni...@gmail.com>:
>
>> Inspired by this post:
>> http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/,
>> I've started putting together something based on the Spark 1.5 UDAF
>> interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141
>>
>> Some questions -
>>
>> 1. How do I get the UDAF to accept input arguments of different type? We
>> can hash anything basically for HLL - Int, Long, String, Object, raw bytes
>> etc. Right now it seems we'd need to build a new UDAF for each input type,
>> which seems strange - I should be able to use one UDAF that can handle raw
>> input of different types, as well as handle existing HLLs that can be
>> merged/aggregated (e.g. for grouped data)
>> 2. @Reynold, how would I ensure this works for Tungsten (ie against raw
>> bytes in memory)? Or does the new Aggregate2 stuff automatically do that?
>> Where should I look for examples on how this works internally?
>> 3. I've based this on the Sum and Avg examples for the new UDAF interface
>> - any suggestions or issue please advise. Is the intermediate buffer
>> efficient?
>> 4. The current HyperLogLogUDT is private - so I've had to make my own one
>> which is a bit pointless as it's copy-pasted. Any thoughts on exposing that
>> type? Or I need to make the package spark.sql ...
>>
>> Nick
>>
>> On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <rx...@databricks.com> wrote:
>>
>>> Yes - it's very interesting. However, ideally we should have a version
>>> of hyperloglog that can work directly against some raw bytes in memory
>>> (rather than java objects), in order for this to fit the Tungsten execution
>>> model where everything is operating directly against some memory address.
>>>
>>> On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath <
>>> nick.pentreath@gmail.com> wrote:
>>>
>>>> Sure I can copy the code but my aim was more to understand:
>>>>
>>>> (A) if this is broadly interesting enough to folks to think about
>>>> updating / extending the existing UDAF within Spark
>>>> (b) how to register ones own custom UDAF - in which case it could be a
>>>> Spark package for example
>>>>
>>>> All examples deal with registering a UDF but nothing about UDAFs
>>>>
>>>> —
>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>
>>>>
>>>> On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos <
>>>> daniel.darabos@lynxanalytics.com> wrote:
>>>>
>>>>> It's already possible to just copy the code from countApproxDistinct
>>>>> <https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153> and
>>>>> access the HLL directly, or do anything you like.
>>>>>
>>>>> On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath <
>>>>> nick.pentreath@gmail.com> wrote:
>>>>>
>>>>>> Any thoughts?
>>>>>>
>>>>>> —
>>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath <
>>>>>> nick.pentreath@gmail.com> wrote:
>>>>>>
>>>>>>> Hey Spark devs
>>>>>>>
>>>>>>> I've been looking at DF UDFs and UDAFs. The approx distinct is using
>>>>>>> hyperloglog,
>>>>>>> but there is only an option to return the count as a Long.
>>>>>>>
>>>>>>> It can be useful to be able to return and store the actual data
>>>>>>> structure (ie serialized HLL). This effectively allows one to do
>>>>>>> aggregation / rollups over columns while still preserving the ability to
>>>>>>> get distinct counts.
>>>>>>>
>>>>>>> For example, one can store daily aggregates of events, grouped by
>>>>>>> various columns, while storing for each grouping the HLL of say unique
>>>>>>> users. So you can get the uniques per day directly but could also very
>>>>>>> easily do arbitrary aggregates (say monthly, annually) and still be able to
>>>>>>> get a unique count for that period by merging the daily HLLS.
>>>>>>>
>>>>>>> I did this a while back as a Hive UDAF (
>>>>>>> https://github.com/MLnick/hive-udf) which returns a Struct field
>>>>>>> containing a "cardinality" field and a "binary" field containing the
>>>>>>> serialized HLL.
>>>>>>>
>>>>>>> I was wondering if there would be interest in something like this? I
>>>>>>> am not so clear on how UDTs work with regards to SerDe - so could one adapt
>>>>>>> the HyperLogLogUDT to be a Struct with the serialized HLL as a field as
>>>>>>> well as count as a field? Then I assume this would automatically play
>>>>>>> nicely with DataFrame I/O etc. The gotcha is one needs to then call
>>>>>>> "approx_count_field.count" (or is there a concept of a "default field" for
>>>>>>> a Struct?).
>>>>>>>
>>>>>>> Also, being able to provide the bitsize parameter may be useful...
>>>>>>>
>>>>>>> The same thinking would apply potentially to other approximate (and
>>>>>>> mergeable) data structures like T-Digest and maybe CMS.
>>>>>>>
>>>>>>> Nick
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: HyperLogLogUDT

Posted by Herman van Hövell tot Westerflier <hv...@questtec.nl>.
Hello Nick,

I have been working on a (UDT-less) implementation of HLL++. You can find
the PR here: https://github.com/apache/spark/pull/8362. This current
implements the dense version of HLL++, which is a further development of
HLL. It returns a Long, but it shouldn't be to hard to return a Row
containing the cardinality and/or the HLL registers (the binary data).

I am curious what the stance is on using UDTs in the new UDAF interface. Is
this still viable? This wouldn't work with UnsafeRow for instance. The
OpenHashSetUDT for instance would be a nice building block for CollectSet
and all Distinct Aggregate operators. Are there any opinions on this?

Kind regards,

Herman van Hövell tot Westerflier

QuestTec B.V.
Torenwacht 98
2353 DC Leiderdorp
hvanhovell@questtec.nl
+599 9 521 4402


2015-09-12 10:07 GMT+02:00 Nick Pentreath <ni...@gmail.com>:

> Inspired by this post:
> http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/,
> I've started putting together something based on the Spark 1.5 UDAF
> interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141
>
> Some questions -
>
> 1. How do I get the UDAF to accept input arguments of different type? We
> can hash anything basically for HLL - Int, Long, String, Object, raw bytes
> etc. Right now it seems we'd need to build a new UDAF for each input type,
> which seems strange - I should be able to use one UDAF that can handle raw
> input of different types, as well as handle existing HLLs that can be
> merged/aggregated (e.g. for grouped data)
> 2. @Reynold, how would I ensure this works for Tungsten (ie against raw
> bytes in memory)? Or does the new Aggregate2 stuff automatically do that?
> Where should I look for examples on how this works internally?
> 3. I've based this on the Sum and Avg examples for the new UDAF interface
> - any suggestions or issue please advise. Is the intermediate buffer
> efficient?
> 4. The current HyperLogLogUDT is private - so I've had to make my own one
> which is a bit pointless as it's copy-pasted. Any thoughts on exposing that
> type? Or I need to make the package spark.sql ...
>
> Nick
>
> On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <rx...@databricks.com> wrote:
>
>> Yes - it's very interesting. However, ideally we should have a version of
>> hyperloglog that can work directly against some raw bytes in memory (rather
>> than java objects), in order for this to fit the Tungsten execution model
>> where everything is operating directly against some memory address.
>>
>> On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath <nick.pentreath@gmail.com
>> > wrote:
>>
>>> Sure I can copy the code but my aim was more to understand:
>>>
>>> (A) if this is broadly interesting enough to folks to think about
>>> updating / extending the existing UDAF within Spark
>>> (b) how to register ones own custom UDAF - in which case it could be a
>>> Spark package for example
>>>
>>> All examples deal with registering a UDF but nothing about UDAFs
>>>
>>> —
>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>
>>>
>>> On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos <
>>> daniel.darabos@lynxanalytics.com> wrote:
>>>
>>>> It's already possible to just copy the code from countApproxDistinct
>>>> <https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153> and
>>>> access the HLL directly, or do anything you like.
>>>>
>>>> On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath <
>>>> nick.pentreath@gmail.com> wrote:
>>>>
>>>>> Any thoughts?
>>>>>
>>>>> —
>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>>
>>>>>
>>>>> On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath <
>>>>> nick.pentreath@gmail.com> wrote:
>>>>>
>>>>>> Hey Spark devs
>>>>>>
>>>>>> I've been looking at DF UDFs and UDAFs. The approx distinct is using
>>>>>> hyperloglog,
>>>>>> but there is only an option to return the count as a Long.
>>>>>>
>>>>>> It can be useful to be able to return and store the actual data
>>>>>> structure (ie serialized HLL). This effectively allows one to do
>>>>>> aggregation / rollups over columns while still preserving the ability to
>>>>>> get distinct counts.
>>>>>>
>>>>>> For example, one can store daily aggregates of events, grouped by
>>>>>> various columns, while storing for each grouping the HLL of say unique
>>>>>> users. So you can get the uniques per day directly but could also very
>>>>>> easily do arbitrary aggregates (say monthly, annually) and still be able to
>>>>>> get a unique count for that period by merging the daily HLLS.
>>>>>>
>>>>>> I did this a while back as a Hive UDAF (
>>>>>> https://github.com/MLnick/hive-udf) which returns a Struct field
>>>>>> containing a "cardinality" field and a "binary" field containing the
>>>>>> serialized HLL.
>>>>>>
>>>>>> I was wondering if there would be interest in something like this? I
>>>>>> am not so clear on how UDTs work with regards to SerDe - so could one adapt
>>>>>> the HyperLogLogUDT to be a Struct with the serialized HLL as a field as
>>>>>> well as count as a field? Then I assume this would automatically play
>>>>>> nicely with DataFrame I/O etc. The gotcha is one needs to then call
>>>>>> "approx_count_field.count" (or is there a concept of a "default field" for
>>>>>> a Struct?).
>>>>>>
>>>>>> Also, being able to provide the bitsize parameter may be useful...
>>>>>>
>>>>>> The same thinking would apply potentially to other approximate (and
>>>>>> mergeable) data structures like T-Digest and maybe CMS.
>>>>>>
>>>>>> Nick
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>