You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by Anoop Johnson <an...@gmail.com> on 2016/03/07 19:58:42 UTC

Calcite on Partitioned Databases

Hello Everyone -

 We have an in-house distributed database with our own custom query engine.
We're considering using Calcite to replace our current query engine.

I looked through the examples and  one of the adapters. One thing I haven't
quite figured out is using Calcite when the data is partitioned and you
need to fan out the query into all the partitions and combine the partial
results.

There are several challenges here - for instance, some operations like AVG
are not partitionable, so they have to be rewritten as COUNT and SUM and
only at the final step.

We could always write custom code to rewrite the query and reassemble the
results. For instance, I saw a planner rule[1] in the Calcite codebase that
does something similar.

Since this is a common problem, I was wondering if there was a standard way
of handling this use case or if there were any example plugin I could look
at.

Thanks,
Anoop


[1]
https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java

Re: Calcite on Partitioned Databases

Posted by Julian Hyde <jh...@apache.org>.
> On Mar 16, 2016, at 10:17 AM, Anoop Johnson <an...@gmail.com> wrote:
> 
> Is there a standardized mechanism for doing the scatter/gather in Calcite?

Calcite does not have a parallel/distributed execution engine. So I wouldn’t say it has a scatter/gather mechanism. What Calcite does provide is algebraic rewrites to enable you to combine partial results. Those rules tend to be similar for a variety of execution engines.

There are various scenarios for combining partial results. If you know the keys don’t overlap you can combine using UNION ALL. If the keys overlap but are sorted you can combine using merge, and the resulting collection will also be sorted. If the keys overlap then you can combine partial aggregates. The SqlSplittableAggFunction interface is our best attempt to describe whether/how partial aggregates can be combined. 

Suppose you have the query 

  SELECT deptno, COUNT(*) AS c FROM emp GROUP BY deptno

and emp has two partitions, and you know these partitions have overlapping deptno values, i.e. emp is equivalent to the view

  CREATE VIEW emp AS SELECT * FROM emp1 UNION ALL SELECT * FROM emp2

Then Calcite will rewrite the query to something equivalent to

  SELECT deptno, SUM(partialCount) AS c
  FROM (
    SELECT deptno, COUNT(*) AS partialCount FROM emp1 GROUP BY deptno
    UNION ALL
    SELECT deptno, COUNT(*) AS partialCount FROM emp2 GROUP BY deptno)
  GROUP BY deptno

The logic is all there, so you don’t need to write any custom code.

If you know deptno values are sorted, you could write a SortedMergeUnion operator (probably a sub-class of Union), and the Calcite rule would generate the ‘input0.partialCount + input1.partialCount’ logic.

Julian


> Even in the absence of it, the scatter part is fairly straightforward. We
> could do something like:
> 
> 1. Query co-ordinator receives the query that needs to run on  a bunch of
> table partitions.
> 2. The query co-ordinator inspects the query and rewrites it. It mainly
> splits the non-partitionable aggregate operations (like AVG) into
> partitionable aggregate functions.
> 3. It then fans out the query to all the remote partition servers.
> 
> I have not figured out what's the best way to aggregate the partial results
> coming back from the remote partitions. It mainly needs to do two things:
> 
> 1. Combine the partial aggregates for the same group by keys coming from
> multiple partitions.
> 2. Compute the final result - if the original query had split aggregates,
> that needs to be combined.
> 
> I could write custom code to do this, but if there is a way (or example) to
> do this within Calcite, I would appreciate any pointers.
> 
> Thanks,
> Anoop
> 
> On Mon, Mar 7, 2016 at 1:32 PM, Julian Hyde <jh...@apache.org> wrote:
> 
>> Some projects that use Calcite (e.g. Drill) model partitioned tables &
>> data using the Distribution trait. A trait represents a physical property
>> of the data; for example, Collation is another trait, and represents how
>> the data is sorted. The Exchange operator can change the distribution of
>> data, analogous to how the Sort operator changes the Collation of the data.
>> 
>> As you correctly say, it is not valid to compute aggregate functions on
>> partitioned data. You need to combine into a single partition first. (Or
>> you can re-partition by the GROUP BY keys, and then safely combine using
>> Union-all.)
>> 
>> You can compute partial aggregates on the partitions, then roll them up.
>> For example if you want to compute AVG(x) you can expand to SUM(x) /
>> COUNT(x), and compute partition SUM(x) and COUNT(X) on each partition then
>> sum them using SUM. AggregateUnionTransposeRule and
>> AggregateJoinTransposeRule deal in that kind of logic, and
>> SqlSplittableAggFunction is an SPI that could describe how an arbitrary
>> aggregate function could be split.
>> 
>> Julian
>> 
>> 
>> 
>>> On Mar 7, 2016, at 10:58 AM, Anoop Johnson <an...@gmail.com>
>> wrote:
>>> 
>>> Hello Everyone -
>>> 
>>> We have an in-house distributed database with our own custom query
>> engine.
>>> We're considering using Calcite to replace our current query engine.
>>> 
>>> I looked through the examples and  one of the adapters. One thing I
>> haven't
>>> quite figured out is using Calcite when the data is partitioned and you
>>> need to fan out the query into all the partitions and combine the partial
>>> results.
>>> 
>>> There are several challenges here - for instance, some operations like
>> AVG
>>> are not partitionable, so they have to be rewritten as COUNT and SUM and
>>> only at the final step.
>>> 
>>> We could always write custom code to rewrite the query and reassemble the
>>> results. For instance, I saw a planner rule[1] in the Calcite codebase
>> that
>>> does something similar.
>>> 
>>> Since this is a common problem, I was wondering if there was a standard
>> way
>>> of handling this use case or if there were any example plugin I could
>> look
>>> at.
>>> 
>>> Thanks,
>>> Anoop
>>> 
>>> 
>>> [1]
>>> 
>> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java
>> 
>> 


Re: Calcite on Partitioned Databases

Posted by Anoop Johnson <an...@gmail.com>.
Thank you Julian for the response. The SqlSplittableAggFunction SPI looks
useful. I could use this to rewrite queries such that they are
partitionable.

I have not looked closely at how Drill does the distributed query
execution. Our table partitions are distributed over several nodes in a
cluster. Essentially the query co-ordinator will have to do scatter the
query among all the partitions and gather the partial results. For now
we're not planning to repartition the data based on the GROUP BY keys
because it is expensive to do so and most of the fields are low cardinality
and can be safely combined by a simple scatter/gather.

Is there a standardized mechanism for doing the scatter/gather in Calcite?
Even in the absence of it, the scatter part is fairly straightforward. We
could do something like:

1. Query co-ordinator receives the query that needs to run on  a bunch of
table partitions.
2. The query co-ordinator inspects the query and rewrites it. It mainly
splits the non-partitionable aggregate operations (like AVG) into
partitionable aggregate functions.
3. It then fans out the query to all the remote partition servers.

I have not figured out what's the best way to aggregate the partial results
coming back from the remote partitions. It mainly needs to do two things:

1. Combine the partial aggregates for the same group by keys coming from
multiple partitions.
2. Compute the final result - if the original query had split aggregates,
that needs to be combined.

I could write custom code to do this, but if there is a way (or example) to
do this within Calcite, I would appreciate any pointers.

Thanks,
Anoop

On Mon, Mar 7, 2016 at 1:32 PM, Julian Hyde <jh...@apache.org> wrote:

> Some projects that use Calcite (e.g. Drill) model partitioned tables &
> data using the Distribution trait. A trait represents a physical property
> of the data; for example, Collation is another trait, and represents how
> the data is sorted. The Exchange operator can change the distribution of
> data, analogous to how the Sort operator changes the Collation of the data.
>
> As you correctly say, it is not valid to compute aggregate functions on
> partitioned data. You need to combine into a single partition first. (Or
> you can re-partition by the GROUP BY keys, and then safely combine using
> Union-all.)
>
> You can compute partial aggregates on the partitions, then roll them up.
> For example if you want to compute AVG(x) you can expand to SUM(x) /
> COUNT(x), and compute partition SUM(x) and COUNT(X) on each partition then
> sum them using SUM. AggregateUnionTransposeRule and
> AggregateJoinTransposeRule deal in that kind of logic, and
> SqlSplittableAggFunction is an SPI that could describe how an arbitrary
> aggregate function could be split.
>
> Julian
>
>
>
> > On Mar 7, 2016, at 10:58 AM, Anoop Johnson <an...@gmail.com>
> wrote:
> >
> > Hello Everyone -
> >
> > We have an in-house distributed database with our own custom query
> engine.
> > We're considering using Calcite to replace our current query engine.
> >
> > I looked through the examples and  one of the adapters. One thing I
> haven't
> > quite figured out is using Calcite when the data is partitioned and you
> > need to fan out the query into all the partitions and combine the partial
> > results.
> >
> > There are several challenges here - for instance, some operations like
> AVG
> > are not partitionable, so they have to be rewritten as COUNT and SUM and
> > only at the final step.
> >
> > We could always write custom code to rewrite the query and reassemble the
> > results. For instance, I saw a planner rule[1] in the Calcite codebase
> that
> > does something similar.
> >
> > Since this is a common problem, I was wondering if there was a standard
> way
> > of handling this use case or if there were any example plugin I could
> look
> > at.
> >
> > Thanks,
> > Anoop
> >
> >
> > [1]
> >
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java
>
>

Re: Calcite on Partitioned Databases

Posted by Julian Hyde <jh...@apache.org>.
Some projects that use Calcite (e.g. Drill) model partitioned tables & data using the Distribution trait. A trait represents a physical property of the data; for example, Collation is another trait, and represents how the data is sorted. The Exchange operator can change the distribution of data, analogous to how the Sort operator changes the Collation of the data. 

As you correctly say, it is not valid to compute aggregate functions on partitioned data. You need to combine into a single partition first. (Or you can re-partition by the GROUP BY keys, and then safely combine using Union-all.)

You can compute partial aggregates on the partitions, then roll them up. For example if you want to compute AVG(x) you can expand to SUM(x) / COUNT(x), and compute partition SUM(x) and COUNT(X) on each partition then sum them using SUM. AggregateUnionTransposeRule and AggregateJoinTransposeRule deal in that kind of logic, and SqlSplittableAggFunction is an SPI that could describe how an arbitrary aggregate function could be split.

Julian



> On Mar 7, 2016, at 10:58 AM, Anoop Johnson <an...@gmail.com> wrote:
> 
> Hello Everyone -
> 
> We have an in-house distributed database with our own custom query engine.
> We're considering using Calcite to replace our current query engine.
> 
> I looked through the examples and  one of the adapters. One thing I haven't
> quite figured out is using Calcite when the data is partitioned and you
> need to fan out the query into all the partitions and combine the partial
> results.
> 
> There are several challenges here - for instance, some operations like AVG
> are not partitionable, so they have to be rewritten as COUNT and SUM and
> only at the final step.
> 
> We could always write custom code to rewrite the query and reassemble the
> results. For instance, I saw a planner rule[1] in the Calcite codebase that
> does something similar.
> 
> Since this is a common problem, I was wondering if there was a standard way
> of handling this use case or if there were any example plugin I could look
> at.
> 
> Thanks,
> Anoop
> 
> 
> [1]
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java