You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Riccardo Ferrari <fe...@gmail.com> on 2018/09/26 13:58:50 UTC

About UDF/UDA

Hi users!

Given my Cassandra version 3.0.x I don't have the famous GROUP BY operator
available. So looking around I turned to UDAs.

I'm aware all/most of the magic happens on the coordinator and the plan is
to keep the data volume low to avoid too much pressure.

Q1: How much is low volume. It's obvious the answer is depends but, has
anyone some experience to share?

Q2: Do I undestand correctly is does not support pagination?

I need something as simple as extract `min`, `max`, `average` and  `count`
per group where I don't know the actual group - I can't fire a query per
each group name. - so something like `SELECT my_uda(field1, field2) WHERE
...;`
This leads to:
- a function that tracks min, max and sum up count and average. The state
is a tuple
- a final function that computes the average.
- the aggregate function that uses the previous two
the result is something like
| 'item': (min_value, max_value, avg_value, count) , 'item2': (...),  ...|
Q3: Is there a way to `flatten` or `explode` the result into multiple lines
?
If Q3 answer is yes: Is there a way to create multiple columns out of the
result:
||other_fileds | item | min | max | avg | count||

BONUS: Are there altenative? Should I really take into account upgrading to
3.11.X ?
Thanks!

Re: About UDF/UDA

Posted by DuyHai Doan <do...@gmail.com>.
I'm afraid you cannot have a proper tabular formatting or an expand to
multiple rows (which changes significantly the semantics)

Indeed the result of the final func is returned by CQL as a whole column
and currently there is no way to change the output formatting


On Thu, Sep 27, 2018 at 6:55 AM, Riccardo Ferrari <fe...@gmail.com>
wrote:

> Thank you Doan,
>
>  Indeed I'm using a FINALFUNC to compute the average already.
> Bit more context, I'm working on bucketized data, each bucket has already
> an 'event count' and an 'average' in it.
> My functions look as follow:
>
> //SFUNC
> CREATE OR REPLACE FUNCTION summaryState(state map<text, frozen<
> tuple<bigint, bigint, bigint, bigint> >>, name text, avgloadtime int,
> eventcount int)
>     CALLED ON NULL INPUT
>     RETURNS map<text, frozen< tuple<bigint, bigint, bigint, bigint> >>
>     LANGUAGE java
>     AS $$
>         if (name != null) {
>             com.datastax.driver.core.TupleValue stats =
> (com.datastax.driver.core.TupleValue)state.get(name);
>
>             if (stats == null) {
>                 com.datastax.driver.core.TupleType statsType =
> com.datastax.driver.core.TupleType.of(com.datastax.
> driver.core.ProtocolVersion.NEWEST_SUPPORTED, com.datastax.driver.core.
> CodecRegistry.DEFAULT_INSTANCE, com.datastax.driver.core.DataType.bigint(),
> com.datastax.driver.core.DataType.bigint(),com.
> datastax.driver.core.DataType.bigint(),com.datastax.driver.
> core.DataType.bigint());
>
>                 stats = statsType.newValue(Long.MAX_VALUE, 0L, 0L, 0L);
>             }
>
>             //Track min
>             Long min_ = (Long) stats.getLong(0);
>             min_ = min_ < avgloadtime ?  min_ : avgloadtime;
>             stats.setLong(0, min_);
>
>             //Track max
>             Long max_ = (Long) stats.getLong(1);
>             max_ = max_ > avgloadtime ?  max_ : avgloadtime;
>             stats.setLong(1, max_);
>
>             //Unroll average
>             Long avgSum = (Long) stats.getLong(2);
>             avgSum = avgSum + avgloadtime;
>             stats.setLong(2, avgSum);
>
>             //Event count
>             Long sampleSum = (Long) stats.getLong(3);
>             sampleSum = sampleSum + eventcount;
>             stats.setLong(3, sampleSum);
>
>             state.put(name, stats);
>         }
>         return state;
>     $$;
>
> //FINALFUNC
> CREATE OR REPLACE FUNCTION summaryFinal (state map<text, frozen<
> tuple<bigint, bigint, bigint, bigint> >>)
>     CALLED ON NULL INPUT
>     RETURNS map<text, frozen< tuple<bigint, bigint, bigint, bigint> >>
>     LANGUAGE java
>     AS $$
>         for (Object name : state.keySet()) {
>             com.datastax.driver.core.TupleValue stats =
> (com.datastax.driver.core.TupleValue) state.get(name);
>
>             Long avgSum = stats.getLong(2);
>             Long sampleSum = stats.getLong(3);
>
>             // Workaround: I can't escame the '/' using cql and had to use
> Math.pow
>             double avg_ = avgSum * Math.pow(sampleSum, -1);
>             stats.setLong(2, new Double(avg_).longValue());
>
>             state.put(name, stats);
>         }
>         return state;
>     $$;
> //AGGREGATE
> CREATE OR REPLACE AGGREGATE summary(text, int, int)
>     SFUNC summaryState
>     STYPE map<text, frozen< tuple<bigint, bigint, bigint, bigint> >>
>     FINALFUNC summaryFinal
>     INITCOND {};
>
> This gives me the following output:
> <TABLE>.summary(event, averageloadtime, count)
> ------------------------------------------------------------
> ------------------
>  {'<item1>': (365, 870, 617, 2), '<item2>'': (381, 11668, 6024, 2)}
>
> I would like to have something lke:
> | item        | min          | max        | average     | count |
> -----------------------------------------------------------------------
> | <item1>  | 365         | 870         | 617            | 2        |
> | <item2>  | 381         | 11668     | 6024          | 2        |
>
> Do you know if that is possible?
>
> On Wed, Sep 26, 2018 at 10:21 PM DuyHai Doan <do...@gmail.com> wrote:
>
>> A hint to answer your Q3 is to use a final function to perform the
>> flattening or transformation on the result of the aggregation
>>
>> The syntax of an UDA is:
>>
>> CREATE [OR REPLACE] AGGREGATE [IF NOT EXISTS]
>> aggregateName(type1, type2, …)
>> SFUNC accumulatorFunction
>> STYPE stateType
>> [FINALFUNC finalFunction]
>> INITCOND initCond;
>>
>>
>> The final return type will be the return type of the FINALFUNC and not
>> necessarily the stateType
>>
>> More details by reading my blog post on it: http://www.doanduyhai.com/
>> blog/?p=1876
>>
>> On Wed, Sep 26, 2018 at 3:58 PM, Riccardo Ferrari <fe...@gmail.com>
>> wrote:
>>
>>> Hi users!
>>>
>>> Given my Cassandra version 3.0.x I don't have the famous GROUP BY
>>> operator available. So looking around I turned to UDAs.
>>>
>>> I'm aware all/most of the magic happens on the coordinator and the plan
>>> is to keep the data volume low to avoid too much pressure.
>>>
>>> Q1: How much is low volume. It's obvious the answer is depends but, has
>>> anyone some experience to share?
>>>
>>> Q2: Do I undestand correctly is does not support pagination?
>>>
>>> I need something as simple as extract `min`, `max`, `average` and
>>> `count` per group where I don't know the actual group - I can't fire a
>>> query per each group name. - so something like `SELECT my_uda(field1,
>>> field2) WHERE ...;`
>>> This leads to:
>>> - a function that tracks min, max and sum up count and average. The
>>> state is a tuple
>>> - a final function that computes the average.
>>> - the aggregate function that uses the previous two
>>> the result is something like
>>> | 'item': (min_value, max_value, avg_value, count) , 'item2': (...),
>>> ...|
>>> Q3: Is there a way to `flatten` or `explode` the result into multiple
>>> lines ?
>>> If Q3 answer is yes: Is there a way to create multiple columns out of
>>> the result:
>>> ||other_fileds | item | min | max | avg | count||
>>>
>>> BONUS: Are there altenative? Should I really take into account upgrading
>>> to 3.11.X ?
>>> Thanks!
>>>
>>
>>

Re: About UDF/UDA

Posted by Riccardo Ferrari <fe...@gmail.com>.
Thank you Doan,

 Indeed I'm using a FINALFUNC to compute the average already.
Bit more context, I'm working on bucketized data, each bucket has already
an 'event count' and an 'average' in it.
My functions look as follow:

//SFUNC
CREATE OR REPLACE FUNCTION summaryState(state map<text, frozen<
tuple<bigint, bigint, bigint, bigint> >>, name text, avgloadtime int,
eventcount int)
    CALLED ON NULL INPUT
    RETURNS map<text, frozen< tuple<bigint, bigint, bigint, bigint> >>
    LANGUAGE java
    AS $$
        if (name != null) {
            com.datastax.driver.core.TupleValue stats =
(com.datastax.driver.core.TupleValue)state.get(name);

            if (stats == null) {
                com.datastax.driver.core.TupleType statsType =
com.datastax.driver.core.TupleType.of(com.datastax.driver.core.ProtocolVersion.NEWEST_SUPPORTED,
com.datastax.driver.core.CodecRegistry.DEFAULT_INSTANCE,
com.datastax.driver.core.DataType.bigint(),
com.datastax.driver.core.DataType.bigint(),com.datastax.driver.core.DataType.bigint(),com.datastax.driver.core.DataType.bigint());

                stats = statsType.newValue(Long.MAX_VALUE, 0L, 0L, 0L);
            }

            //Track min
            Long min_ = (Long) stats.getLong(0);
            min_ = min_ < avgloadtime ?  min_ : avgloadtime;
            stats.setLong(0, min_);

            //Track max
            Long max_ = (Long) stats.getLong(1);
            max_ = max_ > avgloadtime ?  max_ : avgloadtime;
            stats.setLong(1, max_);

            //Unroll average
            Long avgSum = (Long) stats.getLong(2);
            avgSum = avgSum + avgloadtime;
            stats.setLong(2, avgSum);

            //Event count
            Long sampleSum = (Long) stats.getLong(3);
            sampleSum = sampleSum + eventcount;
            stats.setLong(3, sampleSum);

            state.put(name, stats);
        }
        return state;
    $$;

//FINALFUNC
CREATE OR REPLACE FUNCTION summaryFinal (state map<text, frozen<
tuple<bigint, bigint, bigint, bigint> >>)
    CALLED ON NULL INPUT
    RETURNS map<text, frozen< tuple<bigint, bigint, bigint, bigint> >>
    LANGUAGE java
    AS $$
        for (Object name : state.keySet()) {
            com.datastax.driver.core.TupleValue stats =
(com.datastax.driver.core.TupleValue) state.get(name);

            Long avgSum = stats.getLong(2);
            Long sampleSum = stats.getLong(3);

            // Workaround: I can't escame the '/' using cql and had to use
Math.pow
            double avg_ = avgSum * Math.pow(sampleSum, -1);
            stats.setLong(2, new Double(avg_).longValue());

            state.put(name, stats);
        }
        return state;
    $$;
//AGGREGATE
CREATE OR REPLACE AGGREGATE summary(text, int, int)
    SFUNC summaryState
    STYPE map<text, frozen< tuple<bigint, bigint, bigint, bigint> >>
    FINALFUNC summaryFinal
    INITCOND {};

This gives me the following output:
<TABLE>.summary(event, averageloadtime, count)
------------------------------------------------------------------------------
 {'<item1>': (365, 870, 617, 2), '<item2>'': (381, 11668, 6024, 2)}

I would like to have something lke:
| item        | min          | max        | average     | count |
-----------------------------------------------------------------------
| <item1>  | 365         | 870         | 617            | 2        |
| <item2>  | 381         | 11668     | 6024          | 2        |

Do you know if that is possible?

On Wed, Sep 26, 2018 at 10:21 PM DuyHai Doan <do...@gmail.com> wrote:

> A hint to answer your Q3 is to use a final function to perform the
> flattening or transformation on the result of the aggregation
>
> The syntax of an UDA is:
>
> CREATE [OR REPLACE] AGGREGATE [IF NOT EXISTS]
> aggregateName(type1, type2, …)
> SFUNC accumulatorFunction
> STYPE stateType
> [FINALFUNC finalFunction]
> INITCOND initCond;
>
>
> The final return type will be the return type of the FINALFUNC and not
> necessarily the stateType
>
> More details by reading my blog post on it:
> http://www.doanduyhai.com/blog/?p=1876
>
> On Wed, Sep 26, 2018 at 3:58 PM, Riccardo Ferrari <fe...@gmail.com>
> wrote:
>
>> Hi users!
>>
>> Given my Cassandra version 3.0.x I don't have the famous GROUP BY
>> operator available. So looking around I turned to UDAs.
>>
>> I'm aware all/most of the magic happens on the coordinator and the plan
>> is to keep the data volume low to avoid too much pressure.
>>
>> Q1: How much is low volume. It's obvious the answer is depends but, has
>> anyone some experience to share?
>>
>> Q2: Do I undestand correctly is does not support pagination?
>>
>> I need something as simple as extract `min`, `max`, `average` and
>> `count` per group where I don't know the actual group - I can't fire a
>> query per each group name. - so something like `SELECT my_uda(field1,
>> field2) WHERE ...;`
>> This leads to:
>> - a function that tracks min, max and sum up count and average. The state
>> is a tuple
>> - a final function that computes the average.
>> - the aggregate function that uses the previous two
>> the result is something like
>> | 'item': (min_value, max_value, avg_value, count) , 'item2': (...),  ...|
>> Q3: Is there a way to `flatten` or `explode` the result into multiple
>> lines ?
>> If Q3 answer is yes: Is there a way to create multiple columns out of the
>> result:
>> ||other_fileds | item | min | max | avg | count||
>>
>> BONUS: Are there altenative? Should I really take into account upgrading
>> to 3.11.X ?
>> Thanks!
>>
>
>

Re: About UDF/UDA

Posted by DuyHai Doan <do...@gmail.com>.
A hint to answer your Q3 is to use a final function to perform the
flattening or transformation on the result of the aggregation

The syntax of an UDA is:

CREATE [OR REPLACE] AGGREGATE [IF NOT EXISTS]
aggregateName(type1, type2, …)
SFUNC accumulatorFunction
STYPE stateType
[FINALFUNC finalFunction]
INITCOND initCond;


The final return type will be the return type of the FINALFUNC and not
necessarily the stateType

More details by reading my blog post on it:
http://www.doanduyhai.com/blog/?p=1876

On Wed, Sep 26, 2018 at 3:58 PM, Riccardo Ferrari <fe...@gmail.com>
wrote:

> Hi users!
>
> Given my Cassandra version 3.0.x I don't have the famous GROUP BY operator
> available. So looking around I turned to UDAs.
>
> I'm aware all/most of the magic happens on the coordinator and the plan is
> to keep the data volume low to avoid too much pressure.
>
> Q1: How much is low volume. It's obvious the answer is depends but, has
> anyone some experience to share?
>
> Q2: Do I undestand correctly is does not support pagination?
>
> I need something as simple as extract `min`, `max`, `average` and  `count`
> per group where I don't know the actual group - I can't fire a query per
> each group name. - so something like `SELECT my_uda(field1, field2) WHERE
> ...;`
> This leads to:
> - a function that tracks min, max and sum up count and average. The state
> is a tuple
> - a final function that computes the average.
> - the aggregate function that uses the previous two
> the result is something like
> | 'item': (min_value, max_value, avg_value, count) , 'item2': (...),  ...|
> Q3: Is there a way to `flatten` or `explode` the result into multiple
> lines ?
> If Q3 answer is yes: Is there a way to create multiple columns out of the
> result:
> ||other_fileds | item | min | max | avg | count||
>
> BONUS: Are there altenative? Should I really take into account upgrading
> to 3.11.X ?
> Thanks!
>