You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by James <xu...@gmail.com> on 2017/05/09 15:03:56 UTC

Why AggragateFunctionImpl.create has not implement merge method

I was implementing UDAF based on calcite, but found that merge method is
not implemented:

https://github.com/apache/calcite/blob/0938c7b6d767e3242874d87a30d9112512d9243a/core/src/main/java/org/apache/calcite/schema/impl/AggregateFunctionImpl.java#L86

I just wonder why it is not implemented? As a newbie to calcite, it seems
not so hard to implement the merge. Are there any issue blocking the
implemention of merge, or just a matter of time? If it is just a matter of
time, I'd like to spend some time implementing it.

Thanks in advance.
James

Re: Why AggragateFunctionImpl.create has not implement merge method

Posted by Julian Hyde <jh...@apache.org>.
So, let's figure out how to model this as a relational operator.

I think you're describing what in Hadoop terminology would be called a
reducer. Take a look at our Exchange operator[1], which moves data so
that it is partitioned by a particular key; it has a sub-class
SortExchange which does that and also sorts, and therefore models
Hadoop's shuffle operation. Maybe you'd expect Exchange to have
multiple inputs and outputs, but in fact it has one input and one
output, which are physically partitioned.

In relational algebra terms, you need an Aggregate. Like Exchange's
output, the input to your Aggregate is physically partitioned on the
key into multiple "streams". (The input may also be unique on the key,
and/or sorted on the key, and your implementation might require that,
or at least exploit it.)

So, it follows that your "merge" operation is an aggregate function
(albeit one that we don't expose to the end user). You can accomplish
"merge(v0, v1, ..., vn)" by
"init().add(v0).add(v1)....add(vn).result()", which should be just as
efficient after the optimizing compiler has kicked in.

As an example, the "merge" operation for MIN is MIN. The "merge"
operation for COUNT is SUM0 (a variant of SUM that returns 0 when
given no input values).

I think that SubstitutionVisitor.getRollup() [2] is along the right
lines by returning a SqlAggFunction, and we were wrong to add a "A
merge(A, A)" method to our UDAF API.

Julian

[1] https://calcite.apache.org/apidocs/org/apache/calcite/rel/core/Exchange.html

[2] https://insight.io/github.com/apache/calcite/blob/60b4f4eb10a018e7d6ab8ae4f6ac0f4d0b598b1f/core/src/main/java/org/apache/calcite/plan/SubstitutionVisitor.java?line=1316


On Tue, May 9, 2017 at 11:35 PM, James <xu...@gmail.com> wrote:
> Hi Julian, need some help here:
>
> I read the source code, and find that the user defined aggregations are
> eventually invoked here:
>
>
> https://github.com/apache/calcite/blob/ce2122ff2562340333bfa0ba371872fc9a9c6251/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java#L826
>
> the code here is not hard to understand, since all the data are calculated
> in this single method, in a single thread. The invoke process is something
> like:
>
>        (repeatedly)
>        +----------+
>        |          |
>        |          |
> init()-+--> add() +--> result()
>
> Now, if we introduce `merge` into this process, the input data will first
> be splitted into bundles; in each bundle, there will be an intermediate
> results, in the end, these intermediate results will be `merged` to a final
> result, something like:
>
>                   (repeatedly)
>                   +----------+
>                   |          |
>                   |          |
>            init()-+--> add() +--> intermediate result  +
>                                                        |
>                   (repeatedly)                         |
>                   +----------+                         |
>                   |          |                         |
>                   |          |                         |
>            init() +-v> add() +--> intermediate result  +--> merge() +-->
> final result
>                                                        |
>                                                        |
>                   (repeatedly)                         |
>                   +----------+                         |
>                   |          |                         |
>                   |          |                         |
>            init() +-v> add() +--> intermediate result  +
>
>
>
> there will be multiple threads involved here, and the merge method will be
> called at the end this process in a separate thread.
>
> Two questions here:
>
> * how to represent the `multiple threads/bundles` logic in caclite?
> * where to put the merge method?
>
> Is there already some constructs/framework i can utilize/learn, or I need
> to write this new logic from scratch?
>
>
> James <xu...@gmail.com>于2017年5月10日周三 上午10:10写道:
>
>> Julian, thanks for the guides, will log a issue and try to implement it.
>>
>> On Wed, May 10, 2017 at 1:36 AM, Julian Hyde <jh...@apache.org> wrote:
>>
>>> I don't think it's that hard. We just never got round to it. Can you
>>> log a JIRA case, and submit a pull request when you're done. Be sure
>>> to add a test similar to UdfTest.testUserDefinedAggregateFunction.
>>>
>>> Also, there's no code currently that would call merge, but don't let
>>> that stop you. EnumerableAggregate generates code that calls
>>> EnumerableDefaults.groupBy, which groups all values of the same key
>>> together. We'd use merge if we used a parallel or distributed
>>> algorithm.
>>>
>>> Another piece of code that could call merge is roll-up: see
>>> SubstitutionVisitor.getRollup.
>>>
>>> Julian
>>>
>>>
>>> On Tue, May 9, 2017 at 8:03 AM, James <xu...@gmail.com> wrote:
>>> > I was implementing UDAF based on calcite, but found that merge method is
>>> > not implemented:
>>> >
>>> >
>>> https://github.com/apache/calcite/blob/0938c7b6d767e3242874d87a30d9112512d9243a/core/src/main/java/org/apache/calcite/schema/impl/AggregateFunctionImpl.java#L86
>>> >
>>> > I just wonder why it is not implemented? As a newbie to calcite, it
>>> seems
>>> > not so hard to implement the merge. Are there any issue blocking the
>>> > implemention of merge, or just a matter of time? If it is just a matter
>>> of
>>> > time, I'd like to spend some time implementing it.
>>> >
>>> > Thanks in advance.
>>> > James
>>>
>>
>>

Re: Why AggragateFunctionImpl.create has not implement merge method

Posted by James <xu...@gmail.com>.
Hi Julian, need some help here:

I read the source code, and find that the user defined aggregations are
eventually invoked here:


https://github.com/apache/calcite/blob/ce2122ff2562340333bfa0ba371872fc9a9c6251/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java#L826

the code here is not hard to understand, since all the data are calculated
in this single method, in a single thread. The invoke process is something
like:

       (repeatedly)
       +----------+
       |          |
       |          |
init()-+--> add() +--> result()

Now, if we introduce `merge` into this process, the input data will first
be splitted into bundles; in each bundle, there will be an intermediate
results, in the end, these intermediate results will be `merged` to a final
result, something like:

                  (repeatedly)
                  +----------+
                  |          |
                  |          |
           init()-+--> add() +--> intermediate result  +
                                                       |
                  (repeatedly)                         |
                  +----------+                         |
                  |          |                         |
                  |          |                         |
           init() +-v> add() +--> intermediate result  +--> merge() +-->
final result
                                                       |
                                                       |
                  (repeatedly)                         |
                  +----------+                         |
                  |          |                         |
                  |          |                         |
           init() +-v> add() +--> intermediate result  +



there will be multiple threads involved here, and the merge method will be
called at the end this process in a separate thread.

Two questions here:

* how to represent the `multiple threads/bundles` logic in caclite?
* where to put the merge method?

Is there already some constructs/framework i can utilize/learn, or I need
to write this new logic from scratch?


James <xu...@gmail.com>于2017年5月10日周三 上午10:10写道:

> Julian, thanks for the guides, will log a issue and try to implement it.
>
> On Wed, May 10, 2017 at 1:36 AM, Julian Hyde <jh...@apache.org> wrote:
>
>> I don't think it's that hard. We just never got round to it. Can you
>> log a JIRA case, and submit a pull request when you're done. Be sure
>> to add a test similar to UdfTest.testUserDefinedAggregateFunction.
>>
>> Also, there's no code currently that would call merge, but don't let
>> that stop you. EnumerableAggregate generates code that calls
>> EnumerableDefaults.groupBy, which groups all values of the same key
>> together. We'd use merge if we used a parallel or distributed
>> algorithm.
>>
>> Another piece of code that could call merge is roll-up: see
>> SubstitutionVisitor.getRollup.
>>
>> Julian
>>
>>
>> On Tue, May 9, 2017 at 8:03 AM, James <xu...@gmail.com> wrote:
>> > I was implementing UDAF based on calcite, but found that merge method is
>> > not implemented:
>> >
>> >
>> https://github.com/apache/calcite/blob/0938c7b6d767e3242874d87a30d9112512d9243a/core/src/main/java/org/apache/calcite/schema/impl/AggregateFunctionImpl.java#L86
>> >
>> > I just wonder why it is not implemented? As a newbie to calcite, it
>> seems
>> > not so hard to implement the merge. Are there any issue blocking the
>> > implemention of merge, or just a matter of time? If it is just a matter
>> of
>> > time, I'd like to spend some time implementing it.
>> >
>> > Thanks in advance.
>> > James
>>
>
>

Re: Why AggragateFunctionImpl.create has not implement merge method

Posted by James <xu...@gmail.com>.
Julian, thanks for the guides, will log a issue and try to implement it.

On Wed, May 10, 2017 at 1:36 AM, Julian Hyde <jh...@apache.org> wrote:

> I don't think it's that hard. We just never got round to it. Can you
> log a JIRA case, and submit a pull request when you're done. Be sure
> to add a test similar to UdfTest.testUserDefinedAggregateFunction.
>
> Also, there's no code currently that would call merge, but don't let
> that stop you. EnumerableAggregate generates code that calls
> EnumerableDefaults.groupBy, which groups all values of the same key
> together. We'd use merge if we used a parallel or distributed
> algorithm.
>
> Another piece of code that could call merge is roll-up: see
> SubstitutionVisitor.getRollup.
>
> Julian
>
>
> On Tue, May 9, 2017 at 8:03 AM, James <xu...@gmail.com> wrote:
> > I was implementing UDAF based on calcite, but found that merge method is
> > not implemented:
> >
> > https://github.com/apache/calcite/blob/0938c7b6d767e3242874d87a30d911
> 2512d9243a/core/src/main/java/org/apache/calcite/schema/
> impl/AggregateFunctionImpl.java#L86
> >
> > I just wonder why it is not implemented? As a newbie to calcite, it seems
> > not so hard to implement the merge. Are there any issue blocking the
> > implemention of merge, or just a matter of time? If it is just a matter
> of
> > time, I'd like to spend some time implementing it.
> >
> > Thanks in advance.
> > James
>

Re: Why AggragateFunctionImpl.create has not implement merge method

Posted by Julian Hyde <jh...@apache.org>.
I don't think it's that hard. We just never got round to it. Can you
log a JIRA case, and submit a pull request when you're done. Be sure
to add a test similar to UdfTest.testUserDefinedAggregateFunction.

Also, there's no code currently that would call merge, but don't let
that stop you. EnumerableAggregate generates code that calls
EnumerableDefaults.groupBy, which groups all values of the same key
together. We'd use merge if we used a parallel or distributed
algorithm.

Another piece of code that could call merge is roll-up: see
SubstitutionVisitor.getRollup.

Julian


On Tue, May 9, 2017 at 8:03 AM, James <xu...@gmail.com> wrote:
> I was implementing UDAF based on calcite, but found that merge method is
> not implemented:
>
> https://github.com/apache/calcite/blob/0938c7b6d767e3242874d87a30d9112512d9243a/core/src/main/java/org/apache/calcite/schema/impl/AggregateFunctionImpl.java#L86
>
> I just wonder why it is not implemented? As a newbie to calcite, it seems
> not so hard to implement the merge. Are there any issue blocking the
> implemention of merge, or just a matter of time? If it is just a matter of
> time, I'd like to spend some time implementing it.
>
> Thanks in advance.
> James