You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by Julian Hyde <jh...@apache.org> on 2015/10/01 23:30:21 UTC

Partial aggregation in Drill-on-Phoenix

Phoenix is able to perform quite a few relational operations on the
region server: scan, filter, project, aggregate, sort (optionally with
limit). However, the sort and aggregate are necessarily "local". They
can only deal with data on that region server, and there needs to be a
further operation to combine the results from the region servers.

The question is how to plan such queries. I think the answer is an
AggregateExchangeTransposeRule.

The rule would spot an Aggregate on a data source that is split into
multiple locations (partitions) and split it into a partial Aggregate
that computes sub-totals and a summarizing Aggregate that combines
those totals.

How does the planner know that the Aggregate needs to be split? Since
the data's distribution has changed, there would need to be an
Exchange operator. It is the Exchange operator that triggers the rule
to fire.

There are some special cases. If the data is sorted as well as
partitioned (say because the local aggregate uses a sort-based
algorithm) we could maybe use a more efficient plan. And if the
partition key is the same as the aggregation key we don't need a
summarizing Aggregate, just a Union.

It turns out not to be very Phoenix-specific. In the Drill-on-Phoenix
scenario, once the Aggregate has been pushed through the Exchange
(i.e. onto the drill-bit residing on the region server) we can then
push the DrillAggregate across the drill-to-phoenix membrane and make
it into a PhoenixServerAggregate that executes in the region server.

Related issues:
* https://issues.apache.org/jira/browse/DRILL-3840
* https://issues.apache.org/jira/browse/CALCITE-751

Julian

Re: Partial aggregation in Drill-on-Phoenix

Posted by James Taylor <ja...@apache.org>.
Changes look great, Maryann. Would you mind pulling those in, Jacques?

Thanks,
James

On Mon, Oct 19, 2015 at 11:50 AM, Maryann Xue <ma...@gmail.com> wrote:

> Made another two check-ins to https://github.com/jacques-n/drill/pull/5,
> first one for the changes James had suggested. The second check-in included
> some test cases that failed to use Phoenix partial aggregate because of
> https://issues.apache.org/jira/browse/CALCITE-926.
>
> I also reproduced the problem with Phoenix+Calcite, but will make a new
> patch for CALCITE-926 to add some standalone test cases for Calcite.
>
>
> Thanks,
> Maryann
>
> On Fri, Oct 9, 2015 at 1:30 PM, James Taylor <ja...@apache.org>
> wrote:
>
>> Thanks for the updates to the patch, Maryann. It's looking very good -
>> this will perform better I believe. I made a few comments on the pull
>> request.
>>
>> FYI, I filed PHOENIX-2316 to add the missing information (namely the
>> region server that the parallelized scan will go to) so that I can improve
>> the assignment logic.
>>
>>      James
>>
>> On Wed, Oct 7, 2015 at 1:11 PM, Maryann Xue <ma...@gmail.com>
>> wrote:
>>
>>> Made another checkin for the pull request. All good now.
>>>
>>> In order to compile and run, be sure to update the Phoenix project under
>>> Julian's branch.
>>>
>>>
>>> Thanks,
>>> Maryann
>>>
>>> On Wed, Oct 7, 2015 at 12:19 PM, Jacques Nadeau <ja...@dremio.com>
>>> wrote:
>>>
>>>> I just filed a jira for the merge issue:
>>>>
>>>> https://issues.apache.org/jira/browse/DRILL-3907
>>>>
>>>> --
>>>> Jacques Nadeau
>>>> CTO and Co-Founder, Dremio
>>>>
>>>> On Wed, Oct 7, 2015 at 8:54 AM, Jacques Nadeau <ja...@dremio.com>
>>>> wrote:
>>>>
>>>>> Drill doesn't currently have a merge-sort operation available outside
>>>>> the context of an exchange. See here:
>>>>>
>>>>>
>>>>> https://github.com/apache/drill/tree/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver
>>>>>
>>>>> We'll need to do a bit of refactoring to provide this functionality
>>>>> outside the context of an exchange. The one other thing we'll have to think
>>>>> about in this context is how do we avoid doing a n-way merge in the case
>>>>> that the we're not using the collation.
>>>>>
>>>>> --
>>>>> Jacques Nadeau
>>>>> CTO and Co-Founder, Dremio
>>>>>
>>>>> On Wed, Oct 7, 2015 at 8:18 AM, Maryann Xue <ma...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> One thing from what I asked James offline yesterday, and maybe we can
>>>>>> discuss a little bit in today's meeting:
>>>>>>
>>>>>> Phoenix uses a list of lists of Scan objects to indicate Region
>>>>>> boundaries and guideposts, and if the top-level list contains more than one
>>>>>> element it means that the results from different Scanner/ResultIterator
>>>>>> should be merge-sorted. We now use this list in Drill integration to
>>>>>> generate different batches or slices. I see from the Drill plan of a simple
>>>>>> select like "SELECT * FROM A.BEER" that a Drill Sort node sits on top of
>>>>>> the PhoenixTableScan. I guess this is a real sort rather than a merge-sort.
>>>>>> So optimally,
>>>>>> 1) this should be a merge-sort (to be more accurate, a merge)
>>>>>> 2) furthermore, if Drill has something to indicate the order among
>>>>>> slices and batches, we could even turn it into a concat.
>>>>>>
>>>>>> The structure of this Scan list might be helpful for 2), or we may
>>>>>> have some Logical representation for this. Otherwise, we can simply flatten
>>>>>> this list to a one-dimensional list as we do now (in my ci yesterday).
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Maryann
>>>>>>
>>>>>> On Tue, Oct 6, 2015 at 9:52 PM, Maryann Xue <ma...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes, but the partially aggregated results will not contain any
>>>>>>> duplicate rowkeys, since they are also group-by keys. What we need is the
>>>>>>> aggregators and call aggregate for each row. We can write a new simpler
>>>>>>> ResultIterator to replace this, but for now it should work correctly.
>>>>>>>
>>>>>>> On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <jamestaylor@apache.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> The results we get back from the server-side scan are already the
>>>>>>>> partial aggregated values we need. GroupedAggregatingResultIterator
>>>>>>>> will collapse adjacent Tuples together which happen to have the same row
>>>>>>>> key. I'm not sure we want/need this to happen. Instead I think we just need
>>>>>>>> to decode the aggregated values directly from the result of the scan.
>>>>>>>>
>>>>>>>> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <ma...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi James,
>>>>>>>>>
>>>>>>>>> bq. A few questions for you: not sure I understand the changes you
>>>>>>>>> made to PhoenixRecordReader. Is it necessary to wrap the server-side scan
>>>>>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan will
>>>>>>>>> produce results with a single tuple per group by key. In Phoenix, the
>>>>>>>>> GroupedAggregatingResultIterator's function in life is to do the final
>>>>>>>>> merge. Note too that the results aren't sorted that come back from the
>>>>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>>>>>>>> by the group by key). Or is this just to help in decoding the values coming
>>>>>>>>> back from the scan?
>>>>>>>>>
>>>>>>>>> It is necessary. I suppose what we should return as a partial
>>>>>>>>> result from PhoenixRecordReader is exactly the same as what we do in
>>>>>>>>> standalone Phoenix+Calcite, except that the result is partial or say
>>>>>>>>> incomplete. For example, we have "select a, count(*) from t group by a", we
>>>>>>>>> should return rows that have "a" as the first expression value, and
>>>>>>>>> "count(*)" as the second expression value. For this "count" expression, it
>>>>>>>>> actually needs a ClientAggregator for evaluation, and that's what this
>>>>>>>>> GroupedAggregatingResultIterator is used for.
>>>>>>>>> Since "each server-side scan will produce results with a single
>>>>>>>>> tuple per group by key", and PhoenixRecordReader is only dealing with one
>>>>>>>>> server-side result each time, we don't care how the group-by keys are
>>>>>>>>> arranged (ordered or unordered"). Actually
>>>>>>>>> GroupedAggregatingResultIterator is not the group-by iterator we
>>>>>>>>> use for AggregatePlan. It does not "combine". It treats every row as a
>>>>>>>>> different group, by returning its rowkey as the group-by key (
>>>>>>>>> GroupedAggregatingResultIterator.java:56).
>>>>>>>>>
>>>>>>>>> In short, this iterator is for decoding the server-side values. So
>>>>>>>>> we may want to optimize this logic by removing this serialization and
>>>>>>>>> deserialization and having only one set of aggregators in future.
>>>>>>>>>
>>>>>>>>> bq. Also, not sure what impact it has in the way we "combine" the
>>>>>>>>> scans in our Drill parallelization code (PhoenixGroupScan.applyAssignments()),
>>>>>>>>> as each of our scans could include duplicate group by keys. Is it ok to
>>>>>>>>> combine them in this case?
>>>>>>>>>
>>>>>>>>> It should not matter, or at least is not related to the problem
>>>>>>>>> I'm now having.
>>>>>>>>>
>>>>>>>>> bq. One more question: how is the group by key communicated back
>>>>>>>>> to Drill?
>>>>>>>>>
>>>>>>>>> According to the HashAggPrule, if it decides to create a two-phase
>>>>>>>>> aggregate, the first phase is now handled by Phoenix (after applying the
>>>>>>>>> PhoenixHashAggPrule). I assume then the partial results gets shuffled based
>>>>>>>>> on the hash of their group-by keys (returned by PhoenixRecordReader). The
>>>>>>>>> final step is the Drill hash aggregation.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This is my test table "A.BEER", which has for columns: "B", "E1",
>>>>>>>>> "E2", "R", all of INTEGER types. And the data is generated like this:
>>>>>>>>> for (x=1 to N) { //currently N=1000
>>>>>>>>>  UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer
>>>>>>>>> GROUP BY e1".
>>>>>>>>> The expected result should be:
>>>>>>>>> 0 100
>>>>>>>>> 1 100
>>>>>>>>> 2 100
>>>>>>>>> 3 100
>>>>>>>>> 4 100
>>>>>>>>> 5 100
>>>>>>>>> 6 100
>>>>>>>>> 7 100
>>>>>>>>> 8 100
>>>>>>>>> 9 100
>>>>>>>>> The actual result was:
>>>>>>>>> 6 0
>>>>>>>>> 7 0
>>>>>>>>> 8 0
>>>>>>>>> 9 0
>>>>>>>>> 0 0
>>>>>>>>> 1 100
>>>>>>>>> 2 100
>>>>>>>>> 3 100
>>>>>>>>> 4 100
>>>>>>>>> 5 100
>>>>>>>>>
>>>>>>>>> Here I just tried another one "SELECT e2, count(*) FROM a.beer
>>>>>>>>> GROUP BY e2".
>>>>>>>>> Similarly, the expected result should have group-by keys from 0 to
>>>>>>>>> 99, each having a value of 10 as the count, while the actual result was:
>>>>>>>>> from group-by key 86 to 99, together with 0, their count values
>>>>>>>>> were all 0; the rest (1 to 85) all had the correct value 10.
>>>>>>>>>
>>>>>>>>> Looks to me that the scans were good but there was a problem with
>>>>>>>>> one of the hash buckets.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Maryann
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <
>>>>>>>>> jamestaylor@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Nice progress, Maryann.
>>>>>>>>>>
>>>>>>>>>> A few questions for you: not sure I understand the changes you
>>>>>>>>>> made to PhoenixRecordReader. Is it necessary to wrap the server-side scan
>>>>>>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan will
>>>>>>>>>> produce results with a single tuple per group by key. In Phoenix, the
>>>>>>>>>> GroupedAggregatingResultIterator's function in life is to do the final
>>>>>>>>>> merge. Note too that the results aren't sorted that come back from the
>>>>>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>>>>>>>>> by the group by key). Or is this just to help in decoding the values coming
>>>>>>>>>> back from the scan?
>>>>>>>>>>
>>>>>>>>>> Also, not sure what impact it has in the way we "combine" the
>>>>>>>>>> scans in our Drill parallelization code
>>>>>>>>>> (PhoenixGroupScan.applyAssignments()), as each of our scans could include
>>>>>>>>>> duplicate group by keys. Is it ok to combine them in this case?
>>>>>>>>>>
>>>>>>>>>> One more question: how is the group by key communicated back to
>>>>>>>>>> Drill?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> James
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <
>>>>>>>>>> maryann.xue@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Added a few fixes in the pull request. Tested with two regions,
>>>>>>>>>>> turned out that half of the result is empty (count = 0).
>>>>>>>>>>> Not sure if there's anything wrong with
>>>>>>>>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>>>>>>>>>>> .
>>>>>>>>>>> Like Julian said, this rule looks a bit hacky.
>>>>>>>>>>>
>>>>>>>>>>> To force a 2-phase HashAgg, I made a temporary change as well:
>>>>>>>>>>>
>>>>>>>>>>> diff --git
>>>>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>>
>>>>>>>>>>> index b911f6b..58bc918 100644
>>>>>>>>>>>
>>>>>>>>>>> ---
>>>>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>>
>>>>>>>>>>> +++
>>>>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>>
>>>>>>>>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends
>>>>>>>>>>> Prule {
>>>>>>>>>>>
>>>>>>>>>>>    // If any of the aggregate functions are not one of these,
>>>>>>>>>>> then we
>>>>>>>>>>>
>>>>>>>>>>>    // currently won't generate a 2 phase plan.
>>>>>>>>>>>
>>>>>>>>>>>    protected boolean create2PhasePlan(RelOptRuleCall call,
>>>>>>>>>>> DrillAggregateRel aggregate) {
>>>>>>>>>>>
>>>>>>>>>>> -    PlannerSettings settings =
>>>>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>>>>>
>>>>>>>>>>> -    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>>>>>
>>>>>>>>>>> -    boolean smallInput = child.getRows() <
>>>>>>>>>>> settings.getSliceTarget();
>>>>>>>>>>>
>>>>>>>>>>> -    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>>>>>
>>>>>>>>>>> -      return false;
>>>>>>>>>>>
>>>>>>>>>>> -    }
>>>>>>>>>>>
>>>>>>>>>>> +//    PlannerSettings settings =
>>>>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>>>>>
>>>>>>>>>>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>>>>>
>>>>>>>>>>> +//    boolean smallInput = child.getRows() <
>>>>>>>>>>> settings.getSliceTarget();
>>>>>>>>>>>
>>>>>>>>>>> +//    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>>>>>
>>>>>>>>>>> +//      return false;
>>>>>>>>>>>
>>>>>>>>>>> +//    }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>      for (AggregateCall aggCall : aggregate.getAggCallList()) {
>>>>>>>>>>>
>>>>>>>>>>>        String name = aggCall.getAggregation().getName();
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Maryann
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Drill's current approach seems adequate for Drill alone but
>>>>>>>>>>>> extending
>>>>>>>>>>>> it to a heterogenous system that includes Phoenix seems like a
>>>>>>>>>>>> hack.
>>>>>>>>>>>>
>>>>>>>>>>>> I think you should only create Prels for algebra nodes that you
>>>>>>>>>>>> know
>>>>>>>>>>>> for sure are going to run on the Drill engine. If there's a
>>>>>>>>>>>> possibility that it would run in another engine such as Phoenix
>>>>>>>>>>>> then
>>>>>>>>>>>> they should still be logical.
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <
>>>>>>>>>>>> maryann.xue@gmail.com> wrote:
>>>>>>>>>>>> > The partial aggregate seems to be working now, with one
>>>>>>>>>>>> interface extension
>>>>>>>>>>>> > and one bug fix in the Phoenix project. Will do some code
>>>>>>>>>>>> cleanup and
>>>>>>>>>>>> > create a pull request soon.
>>>>>>>>>>>> >
>>>>>>>>>>>> > Still there was a hack in the Drill project which I made to
>>>>>>>>>>>> force 2-phase
>>>>>>>>>>>> > aggregation. I'll try to fix that.
>>>>>>>>>>>> >
>>>>>>>>>>>> > Jacques, I have one question though, how can I verify that
>>>>>>>>>>>> there are more
>>>>>>>>>>>> > than one slice and the shuffle happens?
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> > Thanks,
>>>>>>>>>>>> > Maryann
>>>>>>>>>>>> >
>>>>>>>>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <
>>>>>>>>>>>> jamestaylor@apache.org> wrote:
>>>>>>>>>>>> >
>>>>>>>>>>>> >> Maryann,
>>>>>>>>>>>> >> I believe Jacques mentioned that a little bit of refactoring
>>>>>>>>>>>> is required
>>>>>>>>>>>> >> for a merge sort to occur - there's something that does
>>>>>>>>>>>> that, but it's not
>>>>>>>>>>>> >> expected to be used in this context currently.
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> IMHO, there's more of a clear value in getting the
>>>>>>>>>>>> aggregation to use
>>>>>>>>>>>> >> Phoenix first, so I'd recommend going down that road as
>>>>>>>>>>>> Jacques mentioned
>>>>>>>>>>>> >> above if possible. Once that's working, we can circle back
>>>>>>>>>>>> to the partial
>>>>>>>>>>>> >> sort.
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> Thoughts?
>>>>>>>>>>>> >> James
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <
>>>>>>>>>>>> maryann.xue@gmail.com>
>>>>>>>>>>>> >> wrote:
>>>>>>>>>>>> >>
>>>>>>>>>>>> >>> I actually tried implementing partial sort with
>>>>>>>>>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured
>>>>>>>>>>>> might be a
>>>>>>>>>>>> >>> little easier to start with than partial aggregation. But I
>>>>>>>>>>>> found that even
>>>>>>>>>>>> >>> though the code worked (returned the right results), the
>>>>>>>>>>>> Drill side sort
>>>>>>>>>>>> >>> turned out to be a ordinary sort instead of a merge which
>>>>>>>>>>>> it should have
>>>>>>>>>>>> >>> been. Any idea of how to fix that?
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> Thanks,
>>>>>>>>>>>> >>> Maryann
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <
>>>>>>>>>>>> jacques@dremio.com>
>>>>>>>>>>>> >>> wrote:
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>>> Right now this type of work is done here:
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> With Distribution Trait application here:
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> To me, the easiest way to solve the Phoenix issue is by
>>>>>>>>>>>> providing a rule
>>>>>>>>>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix
>>>>>>>>>>>> convention as
>>>>>>>>>>>> >>>> input. It would replace everywhere but would only be
>>>>>>>>>>>> plannable when it is
>>>>>>>>>>>> >>>> the first phase of aggregation.
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> Thoughts?
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> --
>>>>>>>>>>>> >>>> Jacques Nadeau
>>>>>>>>>>>> >>>> CTO and Co-Founder, Dremio
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <
>>>>>>>>>>>> jhyde@apache.org> wrote:
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>> Phoenix is able to perform quite a few relational
>>>>>>>>>>>> operations on the
>>>>>>>>>>>> >>>>> region server: scan, filter, project, aggregate, sort
>>>>>>>>>>>> (optionally with
>>>>>>>>>>>> >>>>> limit). However, the sort and aggregate are necessarily
>>>>>>>>>>>> "local". They
>>>>>>>>>>>> >>>>> can only deal with data on that region server, and there
>>>>>>>>>>>> needs to be a
>>>>>>>>>>>> >>>>> further operation to combine the results from the region
>>>>>>>>>>>> servers.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> The question is how to plan such queries. I think the
>>>>>>>>>>>> answer is an
>>>>>>>>>>>> >>>>> AggregateExchangeTransposeRule.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> The rule would spot an Aggregate on a data source that is
>>>>>>>>>>>> split into
>>>>>>>>>>>> >>>>> multiple locations (partitions) and split it into a
>>>>>>>>>>>> partial Aggregate
>>>>>>>>>>>> >>>>> that computes sub-totals and a summarizing Aggregate that
>>>>>>>>>>>> combines
>>>>>>>>>>>> >>>>> those totals.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> How does the planner know that the Aggregate needs to be
>>>>>>>>>>>> split? Since
>>>>>>>>>>>> >>>>> the data's distribution has changed, there would need to
>>>>>>>>>>>> be an
>>>>>>>>>>>> >>>>> Exchange operator. It is the Exchange operator that
>>>>>>>>>>>> triggers the rule
>>>>>>>>>>>> >>>>> to fire.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> There are some special cases. If the data is sorted as
>>>>>>>>>>>> well as
>>>>>>>>>>>> >>>>> partitioned (say because the local aggregate uses a
>>>>>>>>>>>> sort-based
>>>>>>>>>>>> >>>>> algorithm) we could maybe use a more efficient plan. And
>>>>>>>>>>>> if the
>>>>>>>>>>>> >>>>> partition key is the same as the aggregation key we don't
>>>>>>>>>>>> need a
>>>>>>>>>>>> >>>>> summarizing Aggregate, just a Union.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> It turns out not to be very Phoenix-specific. In the
>>>>>>>>>>>> Drill-on-Phoenix
>>>>>>>>>>>> >>>>> scenario, once the Aggregate has been pushed through the
>>>>>>>>>>>> Exchange
>>>>>>>>>>>> >>>>> (i.e. onto the drill-bit residing on the region server)
>>>>>>>>>>>> we can then
>>>>>>>>>>>> >>>>> push the DrillAggregate across the drill-to-phoenix
>>>>>>>>>>>> membrane and make
>>>>>>>>>>>> >>>>> it into a PhoenixServerAggregate that executes in the
>>>>>>>>>>>> region server.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> Related issues:
>>>>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> Julian
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Partial aggregation in Drill-on-Phoenix

Posted by Maryann Xue <ma...@gmail.com>.
Made another two check-ins to https://github.com/jacques-n/drill/pull/5,
first one for the changes James had suggested. The second check-in included
some test cases that failed to use Phoenix partial aggregate because of
https://issues.apache.org/jira/browse/CALCITE-926.

I also reproduced the problem with Phoenix+Calcite, but will make a new
patch for CALCITE-926 to add some standalone test cases for Calcite.


Thanks,
Maryann

On Fri, Oct 9, 2015 at 1:30 PM, James Taylor <ja...@apache.org> wrote:

> Thanks for the updates to the patch, Maryann. It's looking very good -
> this will perform better I believe. I made a few comments on the pull
> request.
>
> FYI, I filed PHOENIX-2316 to add the missing information (namely the
> region server that the parallelized scan will go to) so that I can improve
> the assignment logic.
>
>      James
>
> On Wed, Oct 7, 2015 at 1:11 PM, Maryann Xue <ma...@gmail.com> wrote:
>
>> Made another checkin for the pull request. All good now.
>>
>> In order to compile and run, be sure to update the Phoenix project under
>> Julian's branch.
>>
>>
>> Thanks,
>> Maryann
>>
>> On Wed, Oct 7, 2015 at 12:19 PM, Jacques Nadeau <ja...@dremio.com>
>> wrote:
>>
>>> I just filed a jira for the merge issue:
>>>
>>> https://issues.apache.org/jira/browse/DRILL-3907
>>>
>>> --
>>> Jacques Nadeau
>>> CTO and Co-Founder, Dremio
>>>
>>> On Wed, Oct 7, 2015 at 8:54 AM, Jacques Nadeau <ja...@dremio.com>
>>> wrote:
>>>
>>>> Drill doesn't currently have a merge-sort operation available outside
>>>> the context of an exchange. See here:
>>>>
>>>>
>>>> https://github.com/apache/drill/tree/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver
>>>>
>>>> We'll need to do a bit of refactoring to provide this functionality
>>>> outside the context of an exchange. The one other thing we'll have to think
>>>> about in this context is how do we avoid doing a n-way merge in the case
>>>> that the we're not using the collation.
>>>>
>>>> --
>>>> Jacques Nadeau
>>>> CTO and Co-Founder, Dremio
>>>>
>>>> On Wed, Oct 7, 2015 at 8:18 AM, Maryann Xue <ma...@gmail.com>
>>>> wrote:
>>>>
>>>>> One thing from what I asked James offline yesterday, and maybe we can
>>>>> discuss a little bit in today's meeting:
>>>>>
>>>>> Phoenix uses a list of lists of Scan objects to indicate Region
>>>>> boundaries and guideposts, and if the top-level list contains more than one
>>>>> element it means that the results from different Scanner/ResultIterator
>>>>> should be merge-sorted. We now use this list in Drill integration to
>>>>> generate different batches or slices. I see from the Drill plan of a simple
>>>>> select like "SELECT * FROM A.BEER" that a Drill Sort node sits on top of
>>>>> the PhoenixTableScan. I guess this is a real sort rather than a merge-sort.
>>>>> So optimally,
>>>>> 1) this should be a merge-sort (to be more accurate, a merge)
>>>>> 2) furthermore, if Drill has something to indicate the order among
>>>>> slices and batches, we could even turn it into a concat.
>>>>>
>>>>> The structure of this Scan list might be helpful for 2), or we may
>>>>> have some Logical representation for this. Otherwise, we can simply flatten
>>>>> this list to a one-dimensional list as we do now (in my ci yesterday).
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Maryann
>>>>>
>>>>> On Tue, Oct 6, 2015 at 9:52 PM, Maryann Xue <ma...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Yes, but the partially aggregated results will not contain any
>>>>>> duplicate rowkeys, since they are also group-by keys. What we need is the
>>>>>> aggregators and call aggregate for each row. We can write a new simpler
>>>>>> ResultIterator to replace this, but for now it should work correctly.
>>>>>>
>>>>>> On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <ja...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> The results we get back from the server-side scan are already the
>>>>>>> partial aggregated values we need. GroupedAggregatingResultIterator
>>>>>>> will collapse adjacent Tuples together which happen to have the same row
>>>>>>> key. I'm not sure we want/need this to happen. Instead I think we just need
>>>>>>> to decode the aggregated values directly from the result of the scan.
>>>>>>>
>>>>>>> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <ma...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi James,
>>>>>>>>
>>>>>>>> bq. A few questions for you: not sure I understand the changes you
>>>>>>>> made to PhoenixRecordReader. Is it necessary to wrap the server-side scan
>>>>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan will
>>>>>>>> produce results with a single tuple per group by key. In Phoenix, the
>>>>>>>> GroupedAggregatingResultIterator's function in life is to do the final
>>>>>>>> merge. Note too that the results aren't sorted that come back from the
>>>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>>>>>>> by the group by key). Or is this just to help in decoding the values coming
>>>>>>>> back from the scan?
>>>>>>>>
>>>>>>>> It is necessary. I suppose what we should return as a partial
>>>>>>>> result from PhoenixRecordReader is exactly the same as what we do in
>>>>>>>> standalone Phoenix+Calcite, except that the result is partial or say
>>>>>>>> incomplete. For example, we have "select a, count(*) from t group by a", we
>>>>>>>> should return rows that have "a" as the first expression value, and
>>>>>>>> "count(*)" as the second expression value. For this "count" expression, it
>>>>>>>> actually needs a ClientAggregator for evaluation, and that's what this
>>>>>>>> GroupedAggregatingResultIterator is used for.
>>>>>>>> Since "each server-side scan will produce results with a single
>>>>>>>> tuple per group by key", and PhoenixRecordReader is only dealing with one
>>>>>>>> server-side result each time, we don't care how the group-by keys are
>>>>>>>> arranged (ordered or unordered"). Actually
>>>>>>>> GroupedAggregatingResultIterator is not the group-by iterator we
>>>>>>>> use for AggregatePlan. It does not "combine". It treats every row as a
>>>>>>>> different group, by returning its rowkey as the group-by key (
>>>>>>>> GroupedAggregatingResultIterator.java:56).
>>>>>>>>
>>>>>>>> In short, this iterator is for decoding the server-side values. So
>>>>>>>> we may want to optimize this logic by removing this serialization and
>>>>>>>> deserialization and having only one set of aggregators in future.
>>>>>>>>
>>>>>>>> bq. Also, not sure what impact it has in the way we "combine" the
>>>>>>>> scans in our Drill parallelization code (PhoenixGroupScan.applyAssignments()),
>>>>>>>> as each of our scans could include duplicate group by keys. Is it ok to
>>>>>>>> combine them in this case?
>>>>>>>>
>>>>>>>> It should not matter, or at least is not related to the problem I'm
>>>>>>>> now having.
>>>>>>>>
>>>>>>>> bq. One more question: how is the group by key communicated back to
>>>>>>>> Drill?
>>>>>>>>
>>>>>>>> According to the HashAggPrule, if it decides to create a two-phase
>>>>>>>> aggregate, the first phase is now handled by Phoenix (after applying the
>>>>>>>> PhoenixHashAggPrule). I assume then the partial results gets shuffled based
>>>>>>>> on the hash of their group-by keys (returned by PhoenixRecordReader). The
>>>>>>>> final step is the Drill hash aggregation.
>>>>>>>>
>>>>>>>>
>>>>>>>> This is my test table "A.BEER", which has for columns: "B", "E1",
>>>>>>>> "E2", "R", all of INTEGER types. And the data is generated like this:
>>>>>>>> for (x=1 to N) { //currently N=1000
>>>>>>>>  UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
>>>>>>>> }
>>>>>>>>
>>>>>>>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer
>>>>>>>> GROUP BY e1".
>>>>>>>> The expected result should be:
>>>>>>>> 0 100
>>>>>>>> 1 100
>>>>>>>> 2 100
>>>>>>>> 3 100
>>>>>>>> 4 100
>>>>>>>> 5 100
>>>>>>>> 6 100
>>>>>>>> 7 100
>>>>>>>> 8 100
>>>>>>>> 9 100
>>>>>>>> The actual result was:
>>>>>>>> 6 0
>>>>>>>> 7 0
>>>>>>>> 8 0
>>>>>>>> 9 0
>>>>>>>> 0 0
>>>>>>>> 1 100
>>>>>>>> 2 100
>>>>>>>> 3 100
>>>>>>>> 4 100
>>>>>>>> 5 100
>>>>>>>>
>>>>>>>> Here I just tried another one "SELECT e2, count(*) FROM a.beer
>>>>>>>> GROUP BY e2".
>>>>>>>> Similarly, the expected result should have group-by keys from 0 to
>>>>>>>> 99, each having a value of 10 as the count, while the actual result was:
>>>>>>>> from group-by key 86 to 99, together with 0, their count values
>>>>>>>> were all 0; the rest (1 to 85) all had the correct value 10.
>>>>>>>>
>>>>>>>> Looks to me that the scans were good but there was a problem with
>>>>>>>> one of the hash buckets.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Maryann
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <
>>>>>>>> jamestaylor@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Nice progress, Maryann.
>>>>>>>>>
>>>>>>>>> A few questions for you: not sure I understand the changes you
>>>>>>>>> made to PhoenixRecordReader. Is it necessary to wrap the server-side scan
>>>>>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan will
>>>>>>>>> produce results with a single tuple per group by key. In Phoenix, the
>>>>>>>>> GroupedAggregatingResultIterator's function in life is to do the final
>>>>>>>>> merge. Note too that the results aren't sorted that come back from the
>>>>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>>>>>>>> by the group by key). Or is this just to help in decoding the values coming
>>>>>>>>> back from the scan?
>>>>>>>>>
>>>>>>>>> Also, not sure what impact it has in the way we "combine" the
>>>>>>>>> scans in our Drill parallelization code
>>>>>>>>> (PhoenixGroupScan.applyAssignments()), as each of our scans could include
>>>>>>>>> duplicate group by keys. Is it ok to combine them in this case?
>>>>>>>>>
>>>>>>>>> One more question: how is the group by key communicated back to
>>>>>>>>> Drill?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> James
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <maryann.xue@gmail.com
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> Added a few fixes in the pull request. Tested with two regions,
>>>>>>>>>> turned out that half of the result is empty (count = 0).
>>>>>>>>>> Not sure if there's anything wrong with
>>>>>>>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>>>>>>>>>> .
>>>>>>>>>> Like Julian said, this rule looks a bit hacky.
>>>>>>>>>>
>>>>>>>>>> To force a 2-phase HashAgg, I made a temporary change as well:
>>>>>>>>>>
>>>>>>>>>> diff --git
>>>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>
>>>>>>>>>> index b911f6b..58bc918 100644
>>>>>>>>>>
>>>>>>>>>> ---
>>>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>
>>>>>>>>>> +++
>>>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>
>>>>>>>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends
>>>>>>>>>> Prule {
>>>>>>>>>>
>>>>>>>>>>    // If any of the aggregate functions are not one of these,
>>>>>>>>>> then we
>>>>>>>>>>
>>>>>>>>>>    // currently won't generate a 2 phase plan.
>>>>>>>>>>
>>>>>>>>>>    protected boolean create2PhasePlan(RelOptRuleCall call,
>>>>>>>>>> DrillAggregateRel aggregate) {
>>>>>>>>>>
>>>>>>>>>> -    PlannerSettings settings =
>>>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>>>>
>>>>>>>>>> -    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>>>>
>>>>>>>>>> -    boolean smallInput = child.getRows() <
>>>>>>>>>> settings.getSliceTarget();
>>>>>>>>>>
>>>>>>>>>> -    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>>>>
>>>>>>>>>> -      return false;
>>>>>>>>>>
>>>>>>>>>> -    }
>>>>>>>>>>
>>>>>>>>>> +//    PlannerSettings settings =
>>>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>>>>
>>>>>>>>>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>>>>
>>>>>>>>>> +//    boolean smallInput = child.getRows() <
>>>>>>>>>> settings.getSliceTarget();
>>>>>>>>>>
>>>>>>>>>> +//    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>>>>
>>>>>>>>>> +//      return false;
>>>>>>>>>>
>>>>>>>>>> +//    }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>      for (AggregateCall aggCall : aggregate.getAggCallList()) {
>>>>>>>>>>
>>>>>>>>>>        String name = aggCall.getAggregation().getName();
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Maryann
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Drill's current approach seems adequate for Drill alone but
>>>>>>>>>>> extending
>>>>>>>>>>> it to a heterogenous system that includes Phoenix seems like a
>>>>>>>>>>> hack.
>>>>>>>>>>>
>>>>>>>>>>> I think you should only create Prels for algebra nodes that you
>>>>>>>>>>> know
>>>>>>>>>>> for sure are going to run on the Drill engine. If there's a
>>>>>>>>>>> possibility that it would run in another engine such as Phoenix
>>>>>>>>>>> then
>>>>>>>>>>> they should still be logical.
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <
>>>>>>>>>>> maryann.xue@gmail.com> wrote:
>>>>>>>>>>> > The partial aggregate seems to be working now, with one
>>>>>>>>>>> interface extension
>>>>>>>>>>> > and one bug fix in the Phoenix project. Will do some code
>>>>>>>>>>> cleanup and
>>>>>>>>>>> > create a pull request soon.
>>>>>>>>>>> >
>>>>>>>>>>> > Still there was a hack in the Drill project which I made to
>>>>>>>>>>> force 2-phase
>>>>>>>>>>> > aggregation. I'll try to fix that.
>>>>>>>>>>> >
>>>>>>>>>>> > Jacques, I have one question though, how can I verify that
>>>>>>>>>>> there are more
>>>>>>>>>>> > than one slice and the shuffle happens?
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > Thanks,
>>>>>>>>>>> > Maryann
>>>>>>>>>>> >
>>>>>>>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <
>>>>>>>>>>> jamestaylor@apache.org> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >> Maryann,
>>>>>>>>>>> >> I believe Jacques mentioned that a little bit of refactoring
>>>>>>>>>>> is required
>>>>>>>>>>> >> for a merge sort to occur - there's something that does that,
>>>>>>>>>>> but it's not
>>>>>>>>>>> >> expected to be used in this context currently.
>>>>>>>>>>> >>
>>>>>>>>>>> >> IMHO, there's more of a clear value in getting the
>>>>>>>>>>> aggregation to use
>>>>>>>>>>> >> Phoenix first, so I'd recommend going down that road as
>>>>>>>>>>> Jacques mentioned
>>>>>>>>>>> >> above if possible. Once that's working, we can circle back to
>>>>>>>>>>> the partial
>>>>>>>>>>> >> sort.
>>>>>>>>>>> >>
>>>>>>>>>>> >> Thoughts?
>>>>>>>>>>> >> James
>>>>>>>>>>> >>
>>>>>>>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <
>>>>>>>>>>> maryann.xue@gmail.com>
>>>>>>>>>>> >> wrote:
>>>>>>>>>>> >>
>>>>>>>>>>> >>> I actually tried implementing partial sort with
>>>>>>>>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured
>>>>>>>>>>> might be a
>>>>>>>>>>> >>> little easier to start with than partial aggregation. But I
>>>>>>>>>>> found that even
>>>>>>>>>>> >>> though the code worked (returned the right results), the
>>>>>>>>>>> Drill side sort
>>>>>>>>>>> >>> turned out to be a ordinary sort instead of a merge which it
>>>>>>>>>>> should have
>>>>>>>>>>> >>> been. Any idea of how to fix that?
>>>>>>>>>>> >>>
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> Thanks,
>>>>>>>>>>> >>> Maryann
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <
>>>>>>>>>>> jacques@dremio.com>
>>>>>>>>>>> >>> wrote:
>>>>>>>>>>> >>>
>>>>>>>>>>> >>>> Right now this type of work is done here:
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>>
>>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>>
>>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> With Distribution Trait application here:
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>>
>>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> To me, the easiest way to solve the Phoenix issue is by
>>>>>>>>>>> providing a rule
>>>>>>>>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix
>>>>>>>>>>> convention as
>>>>>>>>>>> >>>> input. It would replace everywhere but would only be
>>>>>>>>>>> plannable when it is
>>>>>>>>>>> >>>> the first phase of aggregation.
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> Thoughts?
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> --
>>>>>>>>>>> >>>> Jacques Nadeau
>>>>>>>>>>> >>>> CTO and Co-Founder, Dremio
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <
>>>>>>>>>>> jhyde@apache.org> wrote:
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>>> Phoenix is able to perform quite a few relational
>>>>>>>>>>> operations on the
>>>>>>>>>>> >>>>> region server: scan, filter, project, aggregate, sort
>>>>>>>>>>> (optionally with
>>>>>>>>>>> >>>>> limit). However, the sort and aggregate are necessarily
>>>>>>>>>>> "local". They
>>>>>>>>>>> >>>>> can only deal with data on that region server, and there
>>>>>>>>>>> needs to be a
>>>>>>>>>>> >>>>> further operation to combine the results from the region
>>>>>>>>>>> servers.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> The question is how to plan such queries. I think the
>>>>>>>>>>> answer is an
>>>>>>>>>>> >>>>> AggregateExchangeTransposeRule.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> The rule would spot an Aggregate on a data source that is
>>>>>>>>>>> split into
>>>>>>>>>>> >>>>> multiple locations (partitions) and split it into a
>>>>>>>>>>> partial Aggregate
>>>>>>>>>>> >>>>> that computes sub-totals and a summarizing Aggregate that
>>>>>>>>>>> combines
>>>>>>>>>>> >>>>> those totals.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> How does the planner know that the Aggregate needs to be
>>>>>>>>>>> split? Since
>>>>>>>>>>> >>>>> the data's distribution has changed, there would need to
>>>>>>>>>>> be an
>>>>>>>>>>> >>>>> Exchange operator. It is the Exchange operator that
>>>>>>>>>>> triggers the rule
>>>>>>>>>>> >>>>> to fire.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> There are some special cases. If the data is sorted as
>>>>>>>>>>> well as
>>>>>>>>>>> >>>>> partitioned (say because the local aggregate uses a
>>>>>>>>>>> sort-based
>>>>>>>>>>> >>>>> algorithm) we could maybe use a more efficient plan. And
>>>>>>>>>>> if the
>>>>>>>>>>> >>>>> partition key is the same as the aggregation key we don't
>>>>>>>>>>> need a
>>>>>>>>>>> >>>>> summarizing Aggregate, just a Union.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> It turns out not to be very Phoenix-specific. In the
>>>>>>>>>>> Drill-on-Phoenix
>>>>>>>>>>> >>>>> scenario, once the Aggregate has been pushed through the
>>>>>>>>>>> Exchange
>>>>>>>>>>> >>>>> (i.e. onto the drill-bit residing on the region server) we
>>>>>>>>>>> can then
>>>>>>>>>>> >>>>> push the DrillAggregate across the drill-to-phoenix
>>>>>>>>>>> membrane and make
>>>>>>>>>>> >>>>> it into a PhoenixServerAggregate that executes in the
>>>>>>>>>>> region server.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> Related issues:
>>>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> Julian
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>
>>>>>>>>>>> >>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Partial aggregation in Drill-on-Phoenix

Posted by James Taylor <ja...@apache.org>.
Thanks for the updates to the patch, Maryann. It's looking very good - this
will perform better I believe. I made a few comments on the pull request.

FYI, I filed PHOENIX-2316 to add the missing information (namely the region
server that the parallelized scan will go to) so that I can improve the
assignment logic.

     James

On Wed, Oct 7, 2015 at 1:11 PM, Maryann Xue <ma...@gmail.com> wrote:

> Made another checkin for the pull request. All good now.
>
> In order to compile and run, be sure to update the Phoenix project under
> Julian's branch.
>
>
> Thanks,
> Maryann
>
> On Wed, Oct 7, 2015 at 12:19 PM, Jacques Nadeau <ja...@dremio.com>
> wrote:
>
>> I just filed a jira for the merge issue:
>>
>> https://issues.apache.org/jira/browse/DRILL-3907
>>
>> --
>> Jacques Nadeau
>> CTO and Co-Founder, Dremio
>>
>> On Wed, Oct 7, 2015 at 8:54 AM, Jacques Nadeau <ja...@dremio.com>
>> wrote:
>>
>>> Drill doesn't currently have a merge-sort operation available outside
>>> the context of an exchange. See here:
>>>
>>>
>>> https://github.com/apache/drill/tree/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver
>>>
>>> We'll need to do a bit of refactoring to provide this functionality
>>> outside the context of an exchange. The one other thing we'll have to think
>>> about in this context is how do we avoid doing a n-way merge in the case
>>> that the we're not using the collation.
>>>
>>> --
>>> Jacques Nadeau
>>> CTO and Co-Founder, Dremio
>>>
>>> On Wed, Oct 7, 2015 at 8:18 AM, Maryann Xue <ma...@gmail.com>
>>> wrote:
>>>
>>>> One thing from what I asked James offline yesterday, and maybe we can
>>>> discuss a little bit in today's meeting:
>>>>
>>>> Phoenix uses a list of lists of Scan objects to indicate Region
>>>> boundaries and guideposts, and if the top-level list contains more than one
>>>> element it means that the results from different Scanner/ResultIterator
>>>> should be merge-sorted. We now use this list in Drill integration to
>>>> generate different batches or slices. I see from the Drill plan of a simple
>>>> select like "SELECT * FROM A.BEER" that a Drill Sort node sits on top of
>>>> the PhoenixTableScan. I guess this is a real sort rather than a merge-sort.
>>>> So optimally,
>>>> 1) this should be a merge-sort (to be more accurate, a merge)
>>>> 2) furthermore, if Drill has something to indicate the order among
>>>> slices and batches, we could even turn it into a concat.
>>>>
>>>> The structure of this Scan list might be helpful for 2), or we may have
>>>> some Logical representation for this. Otherwise, we can simply flatten this
>>>> list to a one-dimensional list as we do now (in my ci yesterday).
>>>>
>>>>
>>>>
>>>> Thanks,
>>>> Maryann
>>>>
>>>> On Tue, Oct 6, 2015 at 9:52 PM, Maryann Xue <ma...@gmail.com>
>>>> wrote:
>>>>
>>>>> Yes, but the partially aggregated results will not contain any
>>>>> duplicate rowkeys, since they are also group-by keys. What we need is the
>>>>> aggregators and call aggregate for each row. We can write a new simpler
>>>>> ResultIterator to replace this, but for now it should work correctly.
>>>>>
>>>>> On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <ja...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> The results we get back from the server-side scan are already the
>>>>>> partial aggregated values we need. GroupedAggregatingResultIterator
>>>>>> will collapse adjacent Tuples together which happen to have the same row
>>>>>> key. I'm not sure we want/need this to happen. Instead I think we just need
>>>>>> to decode the aggregated values directly from the result of the scan.
>>>>>>
>>>>>> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <ma...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi James,
>>>>>>>
>>>>>>> bq. A few questions for you: not sure I understand the changes you
>>>>>>> made to PhoenixRecordReader. Is it necessary to wrap the server-side scan
>>>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan will
>>>>>>> produce results with a single tuple per group by key. In Phoenix, the
>>>>>>> GroupedAggregatingResultIterator's function in life is to do the final
>>>>>>> merge. Note too that the results aren't sorted that come back from the
>>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>>>>>> by the group by key). Or is this just to help in decoding the values coming
>>>>>>> back from the scan?
>>>>>>>
>>>>>>> It is necessary. I suppose what we should return as a partial result
>>>>>>> from PhoenixRecordReader is exactly the same as what we do in standalone
>>>>>>> Phoenix+Calcite, except that the result is partial or say incomplete. For
>>>>>>> example, we have "select a, count(*) from t group by a", we should return
>>>>>>> rows that have "a" as the first expression value, and "count(*)" as the
>>>>>>> second expression value. For this "count" expression, it actually needs a
>>>>>>> ClientAggregator for evaluation, and that's what this
>>>>>>> GroupedAggregatingResultIterator is used for.
>>>>>>> Since "each server-side scan will produce results with a single
>>>>>>> tuple per group by key", and PhoenixRecordReader is only dealing with one
>>>>>>> server-side result each time, we don't care how the group-by keys are
>>>>>>> arranged (ordered or unordered"). Actually
>>>>>>> GroupedAggregatingResultIterator is not the group-by iterator we
>>>>>>> use for AggregatePlan. It does not "combine". It treats every row as a
>>>>>>> different group, by returning its rowkey as the group-by key (
>>>>>>> GroupedAggregatingResultIterator.java:56).
>>>>>>>
>>>>>>> In short, this iterator is for decoding the server-side values. So
>>>>>>> we may want to optimize this logic by removing this serialization and
>>>>>>> deserialization and having only one set of aggregators in future.
>>>>>>>
>>>>>>> bq. Also, not sure what impact it has in the way we "combine" the
>>>>>>> scans in our Drill parallelization code (PhoenixGroupScan.applyAssignments()),
>>>>>>> as each of our scans could include duplicate group by keys. Is it ok to
>>>>>>> combine them in this case?
>>>>>>>
>>>>>>> It should not matter, or at least is not related to the problem I'm
>>>>>>> now having.
>>>>>>>
>>>>>>> bq. One more question: how is the group by key communicated back to
>>>>>>> Drill?
>>>>>>>
>>>>>>> According to the HashAggPrule, if it decides to create a two-phase
>>>>>>> aggregate, the first phase is now handled by Phoenix (after applying the
>>>>>>> PhoenixHashAggPrule). I assume then the partial results gets shuffled based
>>>>>>> on the hash of their group-by keys (returned by PhoenixRecordReader). The
>>>>>>> final step is the Drill hash aggregation.
>>>>>>>
>>>>>>>
>>>>>>> This is my test table "A.BEER", which has for columns: "B", "E1",
>>>>>>> "E2", "R", all of INTEGER types. And the data is generated like this:
>>>>>>> for (x=1 to N) { //currently N=1000
>>>>>>>  UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
>>>>>>> }
>>>>>>>
>>>>>>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer
>>>>>>> GROUP BY e1".
>>>>>>> The expected result should be:
>>>>>>> 0 100
>>>>>>> 1 100
>>>>>>> 2 100
>>>>>>> 3 100
>>>>>>> 4 100
>>>>>>> 5 100
>>>>>>> 6 100
>>>>>>> 7 100
>>>>>>> 8 100
>>>>>>> 9 100
>>>>>>> The actual result was:
>>>>>>> 6 0
>>>>>>> 7 0
>>>>>>> 8 0
>>>>>>> 9 0
>>>>>>> 0 0
>>>>>>> 1 100
>>>>>>> 2 100
>>>>>>> 3 100
>>>>>>> 4 100
>>>>>>> 5 100
>>>>>>>
>>>>>>> Here I just tried another one "SELECT e2, count(*) FROM a.beer
>>>>>>> GROUP BY e2".
>>>>>>> Similarly, the expected result should have group-by keys from 0 to
>>>>>>> 99, each having a value of 10 as the count, while the actual result was:
>>>>>>> from group-by key 86 to 99, together with 0, their count values were
>>>>>>> all 0; the rest (1 to 85) all had the correct value 10.
>>>>>>>
>>>>>>> Looks to me that the scans were good but there was a problem with
>>>>>>> one of the hash buckets.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Maryann
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <jamestaylor@apache.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Nice progress, Maryann.
>>>>>>>>
>>>>>>>> A few questions for you: not sure I understand the changes you made
>>>>>>>> to PhoenixRecordReader. Is it necessary to wrap the server-side scan
>>>>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan will
>>>>>>>> produce results with a single tuple per group by key. In Phoenix, the
>>>>>>>> GroupedAggregatingResultIterator's function in life is to do the final
>>>>>>>> merge. Note too that the results aren't sorted that come back from the
>>>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>>>>>>> by the group by key). Or is this just to help in decoding the values coming
>>>>>>>> back from the scan?
>>>>>>>>
>>>>>>>> Also, not sure what impact it has in the way we "combine" the scans
>>>>>>>> in our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as
>>>>>>>> each of our scans could include duplicate group by keys. Is it ok to
>>>>>>>> combine them in this case?
>>>>>>>>
>>>>>>>> One more question: how is the group by key communicated back to
>>>>>>>> Drill?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> James
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <ma...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Added a few fixes in the pull request. Tested with two regions,
>>>>>>>>> turned out that half of the result is empty (count = 0).
>>>>>>>>> Not sure if there's anything wrong with
>>>>>>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>>>>>>>>> .
>>>>>>>>> Like Julian said, this rule looks a bit hacky.
>>>>>>>>>
>>>>>>>>> To force a 2-phase HashAgg, I made a temporary change as well:
>>>>>>>>>
>>>>>>>>> diff --git
>>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>
>>>>>>>>> index b911f6b..58bc918 100644
>>>>>>>>>
>>>>>>>>> ---
>>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>
>>>>>>>>> +++
>>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>
>>>>>>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends
>>>>>>>>> Prule {
>>>>>>>>>
>>>>>>>>>    // If any of the aggregate functions are not one of these, then
>>>>>>>>> we
>>>>>>>>>
>>>>>>>>>    // currently won't generate a 2 phase plan.
>>>>>>>>>
>>>>>>>>>    protected boolean create2PhasePlan(RelOptRuleCall call,
>>>>>>>>> DrillAggregateRel aggregate) {
>>>>>>>>>
>>>>>>>>> -    PlannerSettings settings =
>>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>>>
>>>>>>>>> -    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>>>
>>>>>>>>> -    boolean smallInput = child.getRows() <
>>>>>>>>> settings.getSliceTarget();
>>>>>>>>>
>>>>>>>>> -    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>>>
>>>>>>>>> -      return false;
>>>>>>>>>
>>>>>>>>> -    }
>>>>>>>>>
>>>>>>>>> +//    PlannerSettings settings =
>>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>>>
>>>>>>>>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>>>
>>>>>>>>> +//    boolean smallInput = child.getRows() <
>>>>>>>>> settings.getSliceTarget();
>>>>>>>>>
>>>>>>>>> +//    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>>>
>>>>>>>>> +//      return false;
>>>>>>>>>
>>>>>>>>> +//    }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>      for (AggregateCall aggCall : aggregate.getAggCallList()) {
>>>>>>>>>
>>>>>>>>>        String name = aggCall.getAggregation().getName();
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Maryann
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Drill's current approach seems adequate for Drill alone but
>>>>>>>>>> extending
>>>>>>>>>> it to a heterogenous system that includes Phoenix seems like a
>>>>>>>>>> hack.
>>>>>>>>>>
>>>>>>>>>> I think you should only create Prels for algebra nodes that you
>>>>>>>>>> know
>>>>>>>>>> for sure are going to run on the Drill engine. If there's a
>>>>>>>>>> possibility that it would run in another engine such as Phoenix
>>>>>>>>>> then
>>>>>>>>>> they should still be logical.
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <
>>>>>>>>>> maryann.xue@gmail.com> wrote:
>>>>>>>>>> > The partial aggregate seems to be working now, with one
>>>>>>>>>> interface extension
>>>>>>>>>> > and one bug fix in the Phoenix project. Will do some code
>>>>>>>>>> cleanup and
>>>>>>>>>> > create a pull request soon.
>>>>>>>>>> >
>>>>>>>>>> > Still there was a hack in the Drill project which I made to
>>>>>>>>>> force 2-phase
>>>>>>>>>> > aggregation. I'll try to fix that.
>>>>>>>>>> >
>>>>>>>>>> > Jacques, I have one question though, how can I verify that
>>>>>>>>>> there are more
>>>>>>>>>> > than one slice and the shuffle happens?
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > Thanks,
>>>>>>>>>> > Maryann
>>>>>>>>>> >
>>>>>>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <
>>>>>>>>>> jamestaylor@apache.org> wrote:
>>>>>>>>>> >
>>>>>>>>>> >> Maryann,
>>>>>>>>>> >> I believe Jacques mentioned that a little bit of refactoring
>>>>>>>>>> is required
>>>>>>>>>> >> for a merge sort to occur - there's something that does that,
>>>>>>>>>> but it's not
>>>>>>>>>> >> expected to be used in this context currently.
>>>>>>>>>> >>
>>>>>>>>>> >> IMHO, there's more of a clear value in getting the aggregation
>>>>>>>>>> to use
>>>>>>>>>> >> Phoenix first, so I'd recommend going down that road as
>>>>>>>>>> Jacques mentioned
>>>>>>>>>> >> above if possible. Once that's working, we can circle back to
>>>>>>>>>> the partial
>>>>>>>>>> >> sort.
>>>>>>>>>> >>
>>>>>>>>>> >> Thoughts?
>>>>>>>>>> >> James
>>>>>>>>>> >>
>>>>>>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <
>>>>>>>>>> maryann.xue@gmail.com>
>>>>>>>>>> >> wrote:
>>>>>>>>>> >>
>>>>>>>>>> >>> I actually tried implementing partial sort with
>>>>>>>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured
>>>>>>>>>> might be a
>>>>>>>>>> >>> little easier to start with than partial aggregation. But I
>>>>>>>>>> found that even
>>>>>>>>>> >>> though the code worked (returned the right results), the
>>>>>>>>>> Drill side sort
>>>>>>>>>> >>> turned out to be a ordinary sort instead of a merge which it
>>>>>>>>>> should have
>>>>>>>>>> >>> been. Any idea of how to fix that?
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> Thanks,
>>>>>>>>>> >>> Maryann
>>>>>>>>>> >>>
>>>>>>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <
>>>>>>>>>> jacques@dremio.com>
>>>>>>>>>> >>> wrote:
>>>>>>>>>> >>>
>>>>>>>>>> >>>> Right now this type of work is done here:
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>
>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>
>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> With Distribution Trait application here:
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>
>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> To me, the easiest way to solve the Phoenix issue is by
>>>>>>>>>> providing a rule
>>>>>>>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix
>>>>>>>>>> convention as
>>>>>>>>>> >>>> input. It would replace everywhere but would only be
>>>>>>>>>> plannable when it is
>>>>>>>>>> >>>> the first phase of aggregation.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> Thoughts?
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> --
>>>>>>>>>> >>>> Jacques Nadeau
>>>>>>>>>> >>>> CTO and Co-Founder, Dremio
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <
>>>>>>>>>> jhyde@apache.org> wrote:
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>> Phoenix is able to perform quite a few relational
>>>>>>>>>> operations on the
>>>>>>>>>> >>>>> region server: scan, filter, project, aggregate, sort
>>>>>>>>>> (optionally with
>>>>>>>>>> >>>>> limit). However, the sort and aggregate are necessarily
>>>>>>>>>> "local". They
>>>>>>>>>> >>>>> can only deal with data on that region server, and there
>>>>>>>>>> needs to be a
>>>>>>>>>> >>>>> further operation to combine the results from the region
>>>>>>>>>> servers.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> The question is how to plan such queries. I think the
>>>>>>>>>> answer is an
>>>>>>>>>> >>>>> AggregateExchangeTransposeRule.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> The rule would spot an Aggregate on a data source that is
>>>>>>>>>> split into
>>>>>>>>>> >>>>> multiple locations (partitions) and split it into a partial
>>>>>>>>>> Aggregate
>>>>>>>>>> >>>>> that computes sub-totals and a summarizing Aggregate that
>>>>>>>>>> combines
>>>>>>>>>> >>>>> those totals.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> How does the planner know that the Aggregate needs to be
>>>>>>>>>> split? Since
>>>>>>>>>> >>>>> the data's distribution has changed, there would need to be
>>>>>>>>>> an
>>>>>>>>>> >>>>> Exchange operator. It is the Exchange operator that
>>>>>>>>>> triggers the rule
>>>>>>>>>> >>>>> to fire.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> There are some special cases. If the data is sorted as well
>>>>>>>>>> as
>>>>>>>>>> >>>>> partitioned (say because the local aggregate uses a
>>>>>>>>>> sort-based
>>>>>>>>>> >>>>> algorithm) we could maybe use a more efficient plan. And if
>>>>>>>>>> the
>>>>>>>>>> >>>>> partition key is the same as the aggregation key we don't
>>>>>>>>>> need a
>>>>>>>>>> >>>>> summarizing Aggregate, just a Union.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> It turns out not to be very Phoenix-specific. In the
>>>>>>>>>> Drill-on-Phoenix
>>>>>>>>>> >>>>> scenario, once the Aggregate has been pushed through the
>>>>>>>>>> Exchange
>>>>>>>>>> >>>>> (i.e. onto the drill-bit residing on the region server) we
>>>>>>>>>> can then
>>>>>>>>>> >>>>> push the DrillAggregate across the drill-to-phoenix
>>>>>>>>>> membrane and make
>>>>>>>>>> >>>>> it into a PhoenixServerAggregate that executes in the
>>>>>>>>>> region server.
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> Related issues:
>>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>> Julian
>>>>>>>>>> >>>>>
>>>>>>>>>> >>>>
>>>>>>>>>> >>>>
>>>>>>>>>> >>>
>>>>>>>>>> >>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Partial aggregation in Drill-on-Phoenix

Posted by Maryann Xue <ma...@gmail.com>.
Made another checkin for the pull request. All good now.

In order to compile and run, be sure to update the Phoenix project under
Julian's branch.


Thanks,
Maryann

On Wed, Oct 7, 2015 at 12:19 PM, Jacques Nadeau <ja...@dremio.com> wrote:

> I just filed a jira for the merge issue:
>
> https://issues.apache.org/jira/browse/DRILL-3907
>
> --
> Jacques Nadeau
> CTO and Co-Founder, Dremio
>
> On Wed, Oct 7, 2015 at 8:54 AM, Jacques Nadeau <ja...@dremio.com> wrote:
>
>> Drill doesn't currently have a merge-sort operation available outside the
>> context of an exchange. See here:
>>
>>
>> https://github.com/apache/drill/tree/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver
>>
>> We'll need to do a bit of refactoring to provide this functionality
>> outside the context of an exchange. The one other thing we'll have to think
>> about in this context is how do we avoid doing a n-way merge in the case
>> that the we're not using the collation.
>>
>> --
>> Jacques Nadeau
>> CTO and Co-Founder, Dremio
>>
>> On Wed, Oct 7, 2015 at 8:18 AM, Maryann Xue <ma...@gmail.com>
>> wrote:
>>
>>> One thing from what I asked James offline yesterday, and maybe we can
>>> discuss a little bit in today's meeting:
>>>
>>> Phoenix uses a list of lists of Scan objects to indicate Region
>>> boundaries and guideposts, and if the top-level list contains more than one
>>> element it means that the results from different Scanner/ResultIterator
>>> should be merge-sorted. We now use this list in Drill integration to
>>> generate different batches or slices. I see from the Drill plan of a simple
>>> select like "SELECT * FROM A.BEER" that a Drill Sort node sits on top of
>>> the PhoenixTableScan. I guess this is a real sort rather than a merge-sort.
>>> So optimally,
>>> 1) this should be a merge-sort (to be more accurate, a merge)
>>> 2) furthermore, if Drill has something to indicate the order among
>>> slices and batches, we could even turn it into a concat.
>>>
>>> The structure of this Scan list might be helpful for 2), or we may have
>>> some Logical representation for this. Otherwise, we can simply flatten this
>>> list to a one-dimensional list as we do now (in my ci yesterday).
>>>
>>>
>>>
>>> Thanks,
>>> Maryann
>>>
>>> On Tue, Oct 6, 2015 at 9:52 PM, Maryann Xue <ma...@gmail.com>
>>> wrote:
>>>
>>>> Yes, but the partially aggregated results will not contain any
>>>> duplicate rowkeys, since they are also group-by keys. What we need is the
>>>> aggregators and call aggregate for each row. We can write a new simpler
>>>> ResultIterator to replace this, but for now it should work correctly.
>>>>
>>>> On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <ja...@apache.org>
>>>> wrote:
>>>>
>>>>> The results we get back from the server-side scan are already the
>>>>> partial aggregated values we need. GroupedAggregatingResultIterator
>>>>> will collapse adjacent Tuples together which happen to have the same row
>>>>> key. I'm not sure we want/need this to happen. Instead I think we just need
>>>>> to decode the aggregated values directly from the result of the scan.
>>>>>
>>>>> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <ma...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi James,
>>>>>>
>>>>>> bq. A few questions for you: not sure I understand the changes you
>>>>>> made to PhoenixRecordReader. Is it necessary to wrap the server-side scan
>>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan will
>>>>>> produce results with a single tuple per group by key. In Phoenix, the
>>>>>> GroupedAggregatingResultIterator's function in life is to do the final
>>>>>> merge. Note too that the results aren't sorted that come back from the
>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>>>>> by the group by key). Or is this just to help in decoding the values coming
>>>>>> back from the scan?
>>>>>>
>>>>>> It is necessary. I suppose what we should return as a partial result
>>>>>> from PhoenixRecordReader is exactly the same as what we do in standalone
>>>>>> Phoenix+Calcite, except that the result is partial or say incomplete. For
>>>>>> example, we have "select a, count(*) from t group by a", we should return
>>>>>> rows that have "a" as the first expression value, and "count(*)" as the
>>>>>> second expression value. For this "count" expression, it actually needs a
>>>>>> ClientAggregator for evaluation, and that's what this
>>>>>> GroupedAggregatingResultIterator is used for.
>>>>>> Since "each server-side scan will produce results with a single
>>>>>> tuple per group by key", and PhoenixRecordReader is only dealing with one
>>>>>> server-side result each time, we don't care how the group-by keys are
>>>>>> arranged (ordered or unordered"). Actually
>>>>>> GroupedAggregatingResultIterator is not the group-by iterator we use
>>>>>> for AggregatePlan. It does not "combine". It treats every row as a
>>>>>> different group, by returning its rowkey as the group-by key (
>>>>>> GroupedAggregatingResultIterator.java:56).
>>>>>>
>>>>>> In short, this iterator is for decoding the server-side values. So we
>>>>>> may want to optimize this logic by removing this serialization and
>>>>>> deserialization and having only one set of aggregators in future.
>>>>>>
>>>>>> bq. Also, not sure what impact it has in the way we "combine" the
>>>>>> scans in our Drill parallelization code (PhoenixGroupScan.applyAssignments()),
>>>>>> as each of our scans could include duplicate group by keys. Is it ok to
>>>>>> combine them in this case?
>>>>>>
>>>>>> It should not matter, or at least is not related to the problem I'm
>>>>>> now having.
>>>>>>
>>>>>> bq. One more question: how is the group by key communicated back to
>>>>>> Drill?
>>>>>>
>>>>>> According to the HashAggPrule, if it decides to create a two-phase
>>>>>> aggregate, the first phase is now handled by Phoenix (after applying the
>>>>>> PhoenixHashAggPrule). I assume then the partial results gets shuffled based
>>>>>> on the hash of their group-by keys (returned by PhoenixRecordReader). The
>>>>>> final step is the Drill hash aggregation.
>>>>>>
>>>>>>
>>>>>> This is my test table "A.BEER", which has for columns: "B", "E1",
>>>>>> "E2", "R", all of INTEGER types. And the data is generated like this:
>>>>>> for (x=1 to N) { //currently N=1000
>>>>>>  UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
>>>>>> }
>>>>>>
>>>>>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer
>>>>>> GROUP BY e1".
>>>>>> The expected result should be:
>>>>>> 0 100
>>>>>> 1 100
>>>>>> 2 100
>>>>>> 3 100
>>>>>> 4 100
>>>>>> 5 100
>>>>>> 6 100
>>>>>> 7 100
>>>>>> 8 100
>>>>>> 9 100
>>>>>> The actual result was:
>>>>>> 6 0
>>>>>> 7 0
>>>>>> 8 0
>>>>>> 9 0
>>>>>> 0 0
>>>>>> 1 100
>>>>>> 2 100
>>>>>> 3 100
>>>>>> 4 100
>>>>>> 5 100
>>>>>>
>>>>>> Here I just tried another one "SELECT e2, count(*) FROM a.beer GROUP
>>>>>> BY e2".
>>>>>> Similarly, the expected result should have group-by keys from 0 to
>>>>>> 99, each having a value of 10 as the count, while the actual result was:
>>>>>> from group-by key 86 to 99, together with 0, their count values were
>>>>>> all 0; the rest (1 to 85) all had the correct value 10.
>>>>>>
>>>>>> Looks to me that the scans were good but there was a problem with one
>>>>>> of the hash buckets.
>>>>>>
>>>>>> Thanks,
>>>>>> Maryann
>>>>>>
>>>>>>
>>>>>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <ja...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Nice progress, Maryann.
>>>>>>>
>>>>>>> A few questions for you: not sure I understand the changes you made
>>>>>>> to PhoenixRecordReader. Is it necessary to wrap the server-side scan
>>>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan will
>>>>>>> produce results with a single tuple per group by key. In Phoenix, the
>>>>>>> GroupedAggregatingResultIterator's function in life is to do the final
>>>>>>> merge. Note too that the results aren't sorted that come back from the
>>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>>>>>> by the group by key). Or is this just to help in decoding the values coming
>>>>>>> back from the scan?
>>>>>>>
>>>>>>> Also, not sure what impact it has in the way we "combine" the scans
>>>>>>> in our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as
>>>>>>> each of our scans could include duplicate group by keys. Is it ok to
>>>>>>> combine them in this case?
>>>>>>>
>>>>>>> One more question: how is the group by key communicated back to
>>>>>>> Drill?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> James
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <ma...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Added a few fixes in the pull request. Tested with two regions,
>>>>>>>> turned out that half of the result is empty (count = 0).
>>>>>>>> Not sure if there's anything wrong with
>>>>>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>>>>>>>> .
>>>>>>>> Like Julian said, this rule looks a bit hacky.
>>>>>>>>
>>>>>>>> To force a 2-phase HashAgg, I made a temporary change as well:
>>>>>>>>
>>>>>>>> diff --git
>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>
>>>>>>>> index b911f6b..58bc918 100644
>>>>>>>>
>>>>>>>> ---
>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>
>>>>>>>> +++
>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>
>>>>>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends
>>>>>>>> Prule {
>>>>>>>>
>>>>>>>>    // If any of the aggregate functions are not one of these, then
>>>>>>>> we
>>>>>>>>
>>>>>>>>    // currently won't generate a 2 phase plan.
>>>>>>>>
>>>>>>>>    protected boolean create2PhasePlan(RelOptRuleCall call,
>>>>>>>> DrillAggregateRel aggregate) {
>>>>>>>>
>>>>>>>> -    PlannerSettings settings =
>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>>
>>>>>>>> -    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>>
>>>>>>>> -    boolean smallInput = child.getRows() <
>>>>>>>> settings.getSliceTarget();
>>>>>>>>
>>>>>>>> -    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>>
>>>>>>>> -      return false;
>>>>>>>>
>>>>>>>> -    }
>>>>>>>>
>>>>>>>> +//    PlannerSettings settings =
>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>>
>>>>>>>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>>
>>>>>>>> +//    boolean smallInput = child.getRows() <
>>>>>>>> settings.getSliceTarget();
>>>>>>>>
>>>>>>>> +//    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>>
>>>>>>>> +//      return false;
>>>>>>>>
>>>>>>>> +//    }
>>>>>>>>
>>>>>>>>
>>>>>>>>      for (AggregateCall aggCall : aggregate.getAggCallList()) {
>>>>>>>>
>>>>>>>>        String name = aggCall.getAggregation().getName();
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Maryann
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Drill's current approach seems adequate for Drill alone but
>>>>>>>>> extending
>>>>>>>>> it to a heterogenous system that includes Phoenix seems like a
>>>>>>>>> hack.
>>>>>>>>>
>>>>>>>>> I think you should only create Prels for algebra nodes that you
>>>>>>>>> know
>>>>>>>>> for sure are going to run on the Drill engine. If there's a
>>>>>>>>> possibility that it would run in another engine such as Phoenix
>>>>>>>>> then
>>>>>>>>> they should still be logical.
>>>>>>>>>
>>>>>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <
>>>>>>>>> maryann.xue@gmail.com> wrote:
>>>>>>>>> > The partial aggregate seems to be working now, with one
>>>>>>>>> interface extension
>>>>>>>>> > and one bug fix in the Phoenix project. Will do some code
>>>>>>>>> cleanup and
>>>>>>>>> > create a pull request soon.
>>>>>>>>> >
>>>>>>>>> > Still there was a hack in the Drill project which I made to
>>>>>>>>> force 2-phase
>>>>>>>>> > aggregation. I'll try to fix that.
>>>>>>>>> >
>>>>>>>>> > Jacques, I have one question though, how can I verify that there
>>>>>>>>> are more
>>>>>>>>> > than one slice and the shuffle happens?
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > Thanks,
>>>>>>>>> > Maryann
>>>>>>>>> >
>>>>>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <
>>>>>>>>> jamestaylor@apache.org> wrote:
>>>>>>>>> >
>>>>>>>>> >> Maryann,
>>>>>>>>> >> I believe Jacques mentioned that a little bit of refactoring is
>>>>>>>>> required
>>>>>>>>> >> for a merge sort to occur - there's something that does that,
>>>>>>>>> but it's not
>>>>>>>>> >> expected to be used in this context currently.
>>>>>>>>> >>
>>>>>>>>> >> IMHO, there's more of a clear value in getting the aggregation
>>>>>>>>> to use
>>>>>>>>> >> Phoenix first, so I'd recommend going down that road as Jacques
>>>>>>>>> mentioned
>>>>>>>>> >> above if possible. Once that's working, we can circle back to
>>>>>>>>> the partial
>>>>>>>>> >> sort.
>>>>>>>>> >>
>>>>>>>>> >> Thoughts?
>>>>>>>>> >> James
>>>>>>>>> >>
>>>>>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <
>>>>>>>>> maryann.xue@gmail.com>
>>>>>>>>> >> wrote:
>>>>>>>>> >>
>>>>>>>>> >>> I actually tried implementing partial sort with
>>>>>>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured
>>>>>>>>> might be a
>>>>>>>>> >>> little easier to start with than partial aggregation. But I
>>>>>>>>> found that even
>>>>>>>>> >>> though the code worked (returned the right results), the Drill
>>>>>>>>> side sort
>>>>>>>>> >>> turned out to be a ordinary sort instead of a merge which it
>>>>>>>>> should have
>>>>>>>>> >>> been. Any idea of how to fix that?
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>> Thanks,
>>>>>>>>> >>> Maryann
>>>>>>>>> >>>
>>>>>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <
>>>>>>>>> jacques@dremio.com>
>>>>>>>>> >>> wrote:
>>>>>>>>> >>>
>>>>>>>>> >>>> Right now this type of work is done here:
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>> >>>>
>>>>>>>>> >>>> With Distribution Trait application here:
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>>>>>>> >>>>
>>>>>>>>> >>>> To me, the easiest way to solve the Phoenix issue is by
>>>>>>>>> providing a rule
>>>>>>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix
>>>>>>>>> convention as
>>>>>>>>> >>>> input. It would replace everywhere but would only be
>>>>>>>>> plannable when it is
>>>>>>>>> >>>> the first phase of aggregation.
>>>>>>>>> >>>>
>>>>>>>>> >>>> Thoughts?
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> >>>> --
>>>>>>>>> >>>> Jacques Nadeau
>>>>>>>>> >>>> CTO and Co-Founder, Dremio
>>>>>>>>> >>>>
>>>>>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>> >>>>
>>>>>>>>> >>>>> Phoenix is able to perform quite a few relational operations
>>>>>>>>> on the
>>>>>>>>> >>>>> region server: scan, filter, project, aggregate, sort
>>>>>>>>> (optionally with
>>>>>>>>> >>>>> limit). However, the sort and aggregate are necessarily
>>>>>>>>> "local". They
>>>>>>>>> >>>>> can only deal with data on that region server, and there
>>>>>>>>> needs to be a
>>>>>>>>> >>>>> further operation to combine the results from the region
>>>>>>>>> servers.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> The question is how to plan such queries. I think the answer
>>>>>>>>> is an
>>>>>>>>> >>>>> AggregateExchangeTransposeRule.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> The rule would spot an Aggregate on a data source that is
>>>>>>>>> split into
>>>>>>>>> >>>>> multiple locations (partitions) and split it into a partial
>>>>>>>>> Aggregate
>>>>>>>>> >>>>> that computes sub-totals and a summarizing Aggregate that
>>>>>>>>> combines
>>>>>>>>> >>>>> those totals.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> How does the planner know that the Aggregate needs to be
>>>>>>>>> split? Since
>>>>>>>>> >>>>> the data's distribution has changed, there would need to be
>>>>>>>>> an
>>>>>>>>> >>>>> Exchange operator. It is the Exchange operator that triggers
>>>>>>>>> the rule
>>>>>>>>> >>>>> to fire.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> There are some special cases. If the data is sorted as well
>>>>>>>>> as
>>>>>>>>> >>>>> partitioned (say because the local aggregate uses a
>>>>>>>>> sort-based
>>>>>>>>> >>>>> algorithm) we could maybe use a more efficient plan. And if
>>>>>>>>> the
>>>>>>>>> >>>>> partition key is the same as the aggregation key we don't
>>>>>>>>> need a
>>>>>>>>> >>>>> summarizing Aggregate, just a Union.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> It turns out not to be very Phoenix-specific. In the
>>>>>>>>> Drill-on-Phoenix
>>>>>>>>> >>>>> scenario, once the Aggregate has been pushed through the
>>>>>>>>> Exchange
>>>>>>>>> >>>>> (i.e. onto the drill-bit residing on the region server) we
>>>>>>>>> can then
>>>>>>>>> >>>>> push the DrillAggregate across the drill-to-phoenix membrane
>>>>>>>>> and make
>>>>>>>>> >>>>> it into a PhoenixServerAggregate that executes in the region
>>>>>>>>> server.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> Related issues:
>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> Julian
>>>>>>>>> >>>>>
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> >>>
>>>>>>>>> >>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Partial aggregation in Drill-on-Phoenix

Posted by Jacques Nadeau <ja...@dremio.com>.
I just filed a jira for the merge issue:

https://issues.apache.org/jira/browse/DRILL-3907

--
Jacques Nadeau
CTO and Co-Founder, Dremio

On Wed, Oct 7, 2015 at 8:54 AM, Jacques Nadeau <ja...@dremio.com> wrote:

> Drill doesn't currently have a merge-sort operation available outside the
> context of an exchange. See here:
>
>
> https://github.com/apache/drill/tree/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver
>
> We'll need to do a bit of refactoring to provide this functionality
> outside the context of an exchange. The one other thing we'll have to think
> about in this context is how do we avoid doing a n-way merge in the case
> that the we're not using the collation.
>
> --
> Jacques Nadeau
> CTO and Co-Founder, Dremio
>
> On Wed, Oct 7, 2015 at 8:18 AM, Maryann Xue <ma...@gmail.com> wrote:
>
>> One thing from what I asked James offline yesterday, and maybe we can
>> discuss a little bit in today's meeting:
>>
>> Phoenix uses a list of lists of Scan objects to indicate Region
>> boundaries and guideposts, and if the top-level list contains more than one
>> element it means that the results from different Scanner/ResultIterator
>> should be merge-sorted. We now use this list in Drill integration to
>> generate different batches or slices. I see from the Drill plan of a simple
>> select like "SELECT * FROM A.BEER" that a Drill Sort node sits on top of
>> the PhoenixTableScan. I guess this is a real sort rather than a merge-sort.
>> So optimally,
>> 1) this should be a merge-sort (to be more accurate, a merge)
>> 2) furthermore, if Drill has something to indicate the order among slices
>> and batches, we could even turn it into a concat.
>>
>> The structure of this Scan list might be helpful for 2), or we may have
>> some Logical representation for this. Otherwise, we can simply flatten this
>> list to a one-dimensional list as we do now (in my ci yesterday).
>>
>>
>>
>> Thanks,
>> Maryann
>>
>> On Tue, Oct 6, 2015 at 9:52 PM, Maryann Xue <ma...@gmail.com>
>> wrote:
>>
>>> Yes, but the partially aggregated results will not contain any duplicate
>>> rowkeys, since they are also group-by keys. What we need is the aggregators
>>> and call aggregate for each row. We can write a new simpler ResultIterator
>>> to replace this, but for now it should work correctly.
>>>
>>> On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <ja...@apache.org>
>>> wrote:
>>>
>>>> The results we get back from the server-side scan are already the
>>>> partial aggregated values we need. GroupedAggregatingResultIterator
>>>> will collapse adjacent Tuples together which happen to have the same row
>>>> key. I'm not sure we want/need this to happen. Instead I think we just need
>>>> to decode the aggregated values directly from the result of the scan.
>>>>
>>>> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <ma...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi James,
>>>>>
>>>>> bq. A few questions for you: not sure I understand the changes you
>>>>> made to PhoenixRecordReader. Is it necessary to wrap the server-side scan
>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan will
>>>>> produce results with a single tuple per group by key. In Phoenix, the
>>>>> GroupedAggregatingResultIterator's function in life is to do the final
>>>>> merge. Note too that the results aren't sorted that come back from the
>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>>>> by the group by key). Or is this just to help in decoding the values coming
>>>>> back from the scan?
>>>>>
>>>>> It is necessary. I suppose what we should return as a partial result
>>>>> from PhoenixRecordReader is exactly the same as what we do in standalone
>>>>> Phoenix+Calcite, except that the result is partial or say incomplete. For
>>>>> example, we have "select a, count(*) from t group by a", we should return
>>>>> rows that have "a" as the first expression value, and "count(*)" as the
>>>>> second expression value. For this "count" expression, it actually needs a
>>>>> ClientAggregator for evaluation, and that's what this
>>>>> GroupedAggregatingResultIterator is used for.
>>>>> Since "each server-side scan will produce results with a single tuple
>>>>> per group by key", and PhoenixRecordReader is only dealing with one
>>>>> server-side result each time, we don't care how the group-by keys are
>>>>> arranged (ordered or unordered"). Actually
>>>>> GroupedAggregatingResultIterator is not the group-by iterator we use
>>>>> for AggregatePlan. It does not "combine". It treats every row as a
>>>>> different group, by returning its rowkey as the group-by key (
>>>>> GroupedAggregatingResultIterator.java:56).
>>>>>
>>>>> In short, this iterator is for decoding the server-side values. So we
>>>>> may want to optimize this logic by removing this serialization and
>>>>> deserialization and having only one set of aggregators in future.
>>>>>
>>>>> bq. Also, not sure what impact it has in the way we "combine" the
>>>>> scans in our Drill parallelization code (PhoenixGroupScan.applyAssignments()),
>>>>> as each of our scans could include duplicate group by keys. Is it ok to
>>>>> combine them in this case?
>>>>>
>>>>> It should not matter, or at least is not related to the problem I'm
>>>>> now having.
>>>>>
>>>>> bq. One more question: how is the group by key communicated back to
>>>>> Drill?
>>>>>
>>>>> According to the HashAggPrule, if it decides to create a two-phase
>>>>> aggregate, the first phase is now handled by Phoenix (after applying the
>>>>> PhoenixHashAggPrule). I assume then the partial results gets shuffled based
>>>>> on the hash of their group-by keys (returned by PhoenixRecordReader). The
>>>>> final step is the Drill hash aggregation.
>>>>>
>>>>>
>>>>> This is my test table "A.BEER", which has for columns: "B", "E1",
>>>>> "E2", "R", all of INTEGER types. And the data is generated like this:
>>>>> for (x=1 to N) { //currently N=1000
>>>>>  UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
>>>>> }
>>>>>
>>>>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer
>>>>> GROUP BY e1".
>>>>> The expected result should be:
>>>>> 0 100
>>>>> 1 100
>>>>> 2 100
>>>>> 3 100
>>>>> 4 100
>>>>> 5 100
>>>>> 6 100
>>>>> 7 100
>>>>> 8 100
>>>>> 9 100
>>>>> The actual result was:
>>>>> 6 0
>>>>> 7 0
>>>>> 8 0
>>>>> 9 0
>>>>> 0 0
>>>>> 1 100
>>>>> 2 100
>>>>> 3 100
>>>>> 4 100
>>>>> 5 100
>>>>>
>>>>> Here I just tried another one "SELECT e2, count(*) FROM a.beer GROUP
>>>>> BY e2".
>>>>> Similarly, the expected result should have group-by keys from 0 to 99,
>>>>> each having a value of 10 as the count, while the actual result was:
>>>>> from group-by key 86 to 99, together with 0, their count values were
>>>>> all 0; the rest (1 to 85) all had the correct value 10.
>>>>>
>>>>> Looks to me that the scans were good but there was a problem with one
>>>>> of the hash buckets.
>>>>>
>>>>> Thanks,
>>>>> Maryann
>>>>>
>>>>>
>>>>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <ja...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Nice progress, Maryann.
>>>>>>
>>>>>> A few questions for you: not sure I understand the changes you made
>>>>>> to PhoenixRecordReader. Is it necessary to wrap the server-side scan
>>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan will
>>>>>> produce results with a single tuple per group by key. In Phoenix, the
>>>>>> GroupedAggregatingResultIterator's function in life is to do the final
>>>>>> merge. Note too that the results aren't sorted that come back from the
>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>>>>> by the group by key). Or is this just to help in decoding the values coming
>>>>>> back from the scan?
>>>>>>
>>>>>> Also, not sure what impact it has in the way we "combine" the scans
>>>>>> in our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as
>>>>>> each of our scans could include duplicate group by keys. Is it ok to
>>>>>> combine them in this case?
>>>>>>
>>>>>> One more question: how is the group by key communicated back to Drill?
>>>>>>
>>>>>> Thanks,
>>>>>> James
>>>>>>
>>>>>>
>>>>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <ma...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Added a few fixes in the pull request. Tested with two regions,
>>>>>>> turned out that half of the result is empty (count = 0).
>>>>>>> Not sure if there's anything wrong with
>>>>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>>>>>>> .
>>>>>>> Like Julian said, this rule looks a bit hacky.
>>>>>>>
>>>>>>> To force a 2-phase HashAgg, I made a temporary change as well:
>>>>>>>
>>>>>>> diff --git
>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>
>>>>>>> index b911f6b..58bc918 100644
>>>>>>>
>>>>>>> ---
>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>
>>>>>>> +++
>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>
>>>>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends
>>>>>>> Prule {
>>>>>>>
>>>>>>>    // If any of the aggregate functions are not one of these, then we
>>>>>>>
>>>>>>>    // currently won't generate a 2 phase plan.
>>>>>>>
>>>>>>>    protected boolean create2PhasePlan(RelOptRuleCall call,
>>>>>>> DrillAggregateRel aggregate) {
>>>>>>>
>>>>>>> -    PlannerSettings settings =
>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>
>>>>>>> -    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>
>>>>>>> -    boolean smallInput = child.getRows() <
>>>>>>> settings.getSliceTarget();
>>>>>>>
>>>>>>> -    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>
>>>>>>> -      return false;
>>>>>>>
>>>>>>> -    }
>>>>>>>
>>>>>>> +//    PlannerSettings settings =
>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>
>>>>>>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>
>>>>>>> +//    boolean smallInput = child.getRows() <
>>>>>>> settings.getSliceTarget();
>>>>>>>
>>>>>>> +//    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>
>>>>>>> +//      return false;
>>>>>>>
>>>>>>> +//    }
>>>>>>>
>>>>>>>
>>>>>>>      for (AggregateCall aggCall : aggregate.getAggCallList()) {
>>>>>>>
>>>>>>>        String name = aggCall.getAggregation().getName();
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Maryann
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Drill's current approach seems adequate for Drill alone but
>>>>>>>> extending
>>>>>>>> it to a heterogenous system that includes Phoenix seems like a hack.
>>>>>>>>
>>>>>>>> I think you should only create Prels for algebra nodes that you know
>>>>>>>> for sure are going to run on the Drill engine. If there's a
>>>>>>>> possibility that it would run in another engine such as Phoenix then
>>>>>>>> they should still be logical.
>>>>>>>>
>>>>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <ma...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> > The partial aggregate seems to be working now, with one interface
>>>>>>>> extension
>>>>>>>> > and one bug fix in the Phoenix project. Will do some code cleanup
>>>>>>>> and
>>>>>>>> > create a pull request soon.
>>>>>>>> >
>>>>>>>> > Still there was a hack in the Drill project which I made to force
>>>>>>>> 2-phase
>>>>>>>> > aggregation. I'll try to fix that.
>>>>>>>> >
>>>>>>>> > Jacques, I have one question though, how can I verify that there
>>>>>>>> are more
>>>>>>>> > than one slice and the shuffle happens?
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > Thanks,
>>>>>>>> > Maryann
>>>>>>>> >
>>>>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <
>>>>>>>> jamestaylor@apache.org> wrote:
>>>>>>>> >
>>>>>>>> >> Maryann,
>>>>>>>> >> I believe Jacques mentioned that a little bit of refactoring is
>>>>>>>> required
>>>>>>>> >> for a merge sort to occur - there's something that does that,
>>>>>>>> but it's not
>>>>>>>> >> expected to be used in this context currently.
>>>>>>>> >>
>>>>>>>> >> IMHO, there's more of a clear value in getting the aggregation
>>>>>>>> to use
>>>>>>>> >> Phoenix first, so I'd recommend going down that road as Jacques
>>>>>>>> mentioned
>>>>>>>> >> above if possible. Once that's working, we can circle back to
>>>>>>>> the partial
>>>>>>>> >> sort.
>>>>>>>> >>
>>>>>>>> >> Thoughts?
>>>>>>>> >> James
>>>>>>>> >>
>>>>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <
>>>>>>>> maryann.xue@gmail.com>
>>>>>>>> >> wrote:
>>>>>>>> >>
>>>>>>>> >>> I actually tried implementing partial sort with
>>>>>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured
>>>>>>>> might be a
>>>>>>>> >>> little easier to start with than partial aggregation. But I
>>>>>>>> found that even
>>>>>>>> >>> though the code worked (returned the right results), the Drill
>>>>>>>> side sort
>>>>>>>> >>> turned out to be a ordinary sort instead of a merge which it
>>>>>>>> should have
>>>>>>>> >>> been. Any idea of how to fix that?
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> Thanks,
>>>>>>>> >>> Maryann
>>>>>>>> >>>
>>>>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <
>>>>>>>> jacques@dremio.com>
>>>>>>>> >>> wrote:
>>>>>>>> >>>
>>>>>>>> >>>> Right now this type of work is done here:
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>> >>>>
>>>>>>>> >>>> With Distribution Trait application here:
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>>>>>> >>>>
>>>>>>>> >>>> To me, the easiest way to solve the Phoenix issue is by
>>>>>>>> providing a rule
>>>>>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix
>>>>>>>> convention as
>>>>>>>> >>>> input. It would replace everywhere but would only be plannable
>>>>>>>> when it is
>>>>>>>> >>>> the first phase of aggregation.
>>>>>>>> >>>>
>>>>>>>> >>>> Thoughts?
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> >>>> --
>>>>>>>> >>>> Jacques Nadeau
>>>>>>>> >>>> CTO and Co-Founder, Dremio
>>>>>>>> >>>>
>>>>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org>
>>>>>>>> wrote:
>>>>>>>> >>>>
>>>>>>>> >>>>> Phoenix is able to perform quite a few relational operations
>>>>>>>> on the
>>>>>>>> >>>>> region server: scan, filter, project, aggregate, sort
>>>>>>>> (optionally with
>>>>>>>> >>>>> limit). However, the sort and aggregate are necessarily
>>>>>>>> "local". They
>>>>>>>> >>>>> can only deal with data on that region server, and there
>>>>>>>> needs to be a
>>>>>>>> >>>>> further operation to combine the results from the region
>>>>>>>> servers.
>>>>>>>> >>>>>
>>>>>>>> >>>>> The question is how to plan such queries. I think the answer
>>>>>>>> is an
>>>>>>>> >>>>> AggregateExchangeTransposeRule.
>>>>>>>> >>>>>
>>>>>>>> >>>>> The rule would spot an Aggregate on a data source that is
>>>>>>>> split into
>>>>>>>> >>>>> multiple locations (partitions) and split it into a partial
>>>>>>>> Aggregate
>>>>>>>> >>>>> that computes sub-totals and a summarizing Aggregate that
>>>>>>>> combines
>>>>>>>> >>>>> those totals.
>>>>>>>> >>>>>
>>>>>>>> >>>>> How does the planner know that the Aggregate needs to be
>>>>>>>> split? Since
>>>>>>>> >>>>> the data's distribution has changed, there would need to be an
>>>>>>>> >>>>> Exchange operator. It is the Exchange operator that triggers
>>>>>>>> the rule
>>>>>>>> >>>>> to fire.
>>>>>>>> >>>>>
>>>>>>>> >>>>> There are some special cases. If the data is sorted as well as
>>>>>>>> >>>>> partitioned (say because the local aggregate uses a sort-based
>>>>>>>> >>>>> algorithm) we could maybe use a more efficient plan. And if
>>>>>>>> the
>>>>>>>> >>>>> partition key is the same as the aggregation key we don't
>>>>>>>> need a
>>>>>>>> >>>>> summarizing Aggregate, just a Union.
>>>>>>>> >>>>>
>>>>>>>> >>>>> It turns out not to be very Phoenix-specific. In the
>>>>>>>> Drill-on-Phoenix
>>>>>>>> >>>>> scenario, once the Aggregate has been pushed through the
>>>>>>>> Exchange
>>>>>>>> >>>>> (i.e. onto the drill-bit residing on the region server) we
>>>>>>>> can then
>>>>>>>> >>>>> push the DrillAggregate across the drill-to-phoenix membrane
>>>>>>>> and make
>>>>>>>> >>>>> it into a PhoenixServerAggregate that executes in the region
>>>>>>>> server.
>>>>>>>> >>>>>
>>>>>>>> >>>>> Related issues:
>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>>>>>> >>>>>
>>>>>>>> >>>>> Julian
>>>>>>>> >>>>>
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> >>>
>>>>>>>> >>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Partial aggregation in Drill-on-Phoenix

Posted by Jacques Nadeau <ja...@dremio.com>.
Drill doesn't currently have a merge-sort operation available outside the
context of an exchange. See here:

https://github.com/apache/drill/tree/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver

We'll need to do a bit of refactoring to provide this functionality outside
the context of an exchange. The one other thing we'll have to think about
in this context is how do we avoid doing a n-way merge in the case that the
we're not using the collation.

--
Jacques Nadeau
CTO and Co-Founder, Dremio

On Wed, Oct 7, 2015 at 8:18 AM, Maryann Xue <ma...@gmail.com> wrote:

> One thing from what I asked James offline yesterday, and maybe we can
> discuss a little bit in today's meeting:
>
> Phoenix uses a list of lists of Scan objects to indicate Region boundaries
> and guideposts, and if the top-level list contains more than one element it
> means that the results from different Scanner/ResultIterator should be
> merge-sorted. We now use this list in Drill integration to generate
> different batches or slices. I see from the Drill plan of a simple select
> like "SELECT * FROM A.BEER" that a Drill Sort node sits on top of the
> PhoenixTableScan. I guess this is a real sort rather than a merge-sort.
> So optimally,
> 1) this should be a merge-sort (to be more accurate, a merge)
> 2) furthermore, if Drill has something to indicate the order among slices
> and batches, we could even turn it into a concat.
>
> The structure of this Scan list might be helpful for 2), or we may have
> some Logical representation for this. Otherwise, we can simply flatten this
> list to a one-dimensional list as we do now (in my ci yesterday).
>
>
>
> Thanks,
> Maryann
>
> On Tue, Oct 6, 2015 at 9:52 PM, Maryann Xue <ma...@gmail.com> wrote:
>
>> Yes, but the partially aggregated results will not contain any duplicate
>> rowkeys, since they are also group-by keys. What we need is the aggregators
>> and call aggregate for each row. We can write a new simpler ResultIterator
>> to replace this, but for now it should work correctly.
>>
>> On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <ja...@apache.org>
>> wrote:
>>
>>> The results we get back from the server-side scan are already the
>>> partial aggregated values we need. GroupedAggregatingResultIterator
>>> will collapse adjacent Tuples together which happen to have the same row
>>> key. I'm not sure we want/need this to happen. Instead I think we just need
>>> to decode the aggregated values directly from the result of the scan.
>>>
>>> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <ma...@gmail.com>
>>> wrote:
>>>
>>>> Hi James,
>>>>
>>>> bq. A few questions for you: not sure I understand the changes you made
>>>> to PhoenixRecordReader. Is it necessary to wrap the server-side scan
>>>> results in a GroupedAggregatingResultIterator? Each server-side scan will
>>>> produce results with a single tuple per group by key. In Phoenix, the
>>>> GroupedAggregatingResultIterator's function in life is to do the final
>>>> merge. Note too that the results aren't sorted that come back from the
>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>>> by the group by key). Or is this just to help in decoding the values coming
>>>> back from the scan?
>>>>
>>>> It is necessary. I suppose what we should return as a partial result
>>>> from PhoenixRecordReader is exactly the same as what we do in standalone
>>>> Phoenix+Calcite, except that the result is partial or say incomplete. For
>>>> example, we have "select a, count(*) from t group by a", we should return
>>>> rows that have "a" as the first expression value, and "count(*)" as the
>>>> second expression value. For this "count" expression, it actually needs a
>>>> ClientAggregator for evaluation, and that's what this
>>>> GroupedAggregatingResultIterator is used for.
>>>> Since "each server-side scan will produce results with a single tuple
>>>> per group by key", and PhoenixRecordReader is only dealing with one
>>>> server-side result each time, we don't care how the group-by keys are
>>>> arranged (ordered or unordered"). Actually
>>>> GroupedAggregatingResultIterator is not the group-by iterator we use
>>>> for AggregatePlan. It does not "combine". It treats every row as a
>>>> different group, by returning its rowkey as the group-by key (
>>>> GroupedAggregatingResultIterator.java:56).
>>>>
>>>> In short, this iterator is for decoding the server-side values. So we
>>>> may want to optimize this logic by removing this serialization and
>>>> deserialization and having only one set of aggregators in future.
>>>>
>>>> bq. Also, not sure what impact it has in the way we "combine" the scans
>>>> in our Drill parallelization code (PhoenixGroupScan.applyAssignments()),
>>>> as each of our scans could include duplicate group by keys. Is it ok to
>>>> combine them in this case?
>>>>
>>>> It should not matter, or at least is not related to the problem I'm now
>>>> having.
>>>>
>>>> bq. One more question: how is the group by key communicated back to
>>>> Drill?
>>>>
>>>> According to the HashAggPrule, if it decides to create a two-phase
>>>> aggregate, the first phase is now handled by Phoenix (after applying the
>>>> PhoenixHashAggPrule). I assume then the partial results gets shuffled based
>>>> on the hash of their group-by keys (returned by PhoenixRecordReader). The
>>>> final step is the Drill hash aggregation.
>>>>
>>>>
>>>> This is my test table "A.BEER", which has for columns: "B", "E1", "E2",
>>>> "R", all of INTEGER types. And the data is generated like this:
>>>> for (x=1 to N) { //currently N=1000
>>>>  UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
>>>> }
>>>>
>>>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer
>>>> GROUP BY e1".
>>>> The expected result should be:
>>>> 0 100
>>>> 1 100
>>>> 2 100
>>>> 3 100
>>>> 4 100
>>>> 5 100
>>>> 6 100
>>>> 7 100
>>>> 8 100
>>>> 9 100
>>>> The actual result was:
>>>> 6 0
>>>> 7 0
>>>> 8 0
>>>> 9 0
>>>> 0 0
>>>> 1 100
>>>> 2 100
>>>> 3 100
>>>> 4 100
>>>> 5 100
>>>>
>>>> Here I just tried another one "SELECT e2, count(*) FROM a.beer GROUP
>>>> BY e2".
>>>> Similarly, the expected result should have group-by keys from 0 to 99,
>>>> each having a value of 10 as the count, while the actual result was:
>>>> from group-by key 86 to 99, together with 0, their count values were
>>>> all 0; the rest (1 to 85) all had the correct value 10.
>>>>
>>>> Looks to me that the scans were good but there was a problem with one
>>>> of the hash buckets.
>>>>
>>>> Thanks,
>>>> Maryann
>>>>
>>>>
>>>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <ja...@apache.org>
>>>> wrote:
>>>>
>>>>> Nice progress, Maryann.
>>>>>
>>>>> A few questions for you: not sure I understand the changes you made to
>>>>> PhoenixRecordReader. Is it necessary to wrap the server-side scan results
>>>>> in a GroupedAggregatingResultIterator? Each server-side scan will produce
>>>>> results with a single tuple per group by key. In Phoenix, the
>>>>> GroupedAggregatingResultIterator's function in life is to do the final
>>>>> merge. Note too that the results aren't sorted that come back from the
>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>>>> by the group by key). Or is this just to help in decoding the values coming
>>>>> back from the scan?
>>>>>
>>>>> Also, not sure what impact it has in the way we "combine" the scans in
>>>>> our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as
>>>>> each of our scans could include duplicate group by keys. Is it ok to
>>>>> combine them in this case?
>>>>>
>>>>> One more question: how is the group by key communicated back to Drill?
>>>>>
>>>>> Thanks,
>>>>> James
>>>>>
>>>>>
>>>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <ma...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Added a few fixes in the pull request. Tested with two regions,
>>>>>> turned out that half of the result is empty (count = 0).
>>>>>> Not sure if there's anything wrong with
>>>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>>>>>> .
>>>>>> Like Julian said, this rule looks a bit hacky.
>>>>>>
>>>>>> To force a 2-phase HashAgg, I made a temporary change as well:
>>>>>>
>>>>>> diff --git
>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>
>>>>>> index b911f6b..58bc918 100644
>>>>>>
>>>>>> ---
>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>
>>>>>> +++
>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>
>>>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends Prule
>>>>>> {
>>>>>>
>>>>>>    // If any of the aggregate functions are not one of these, then we
>>>>>>
>>>>>>    // currently won't generate a 2 phase plan.
>>>>>>
>>>>>>    protected boolean create2PhasePlan(RelOptRuleCall call,
>>>>>> DrillAggregateRel aggregate) {
>>>>>>
>>>>>> -    PlannerSettings settings =
>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>
>>>>>> -    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>
>>>>>> -    boolean smallInput = child.getRows() < settings.getSliceTarget();
>>>>>>
>>>>>> -    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>
>>>>>> -      return false;
>>>>>>
>>>>>> -    }
>>>>>>
>>>>>> +//    PlannerSettings settings =
>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>
>>>>>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>
>>>>>> +//    boolean smallInput = child.getRows() <
>>>>>> settings.getSliceTarget();
>>>>>>
>>>>>> +//    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>
>>>>>> +//      return false;
>>>>>>
>>>>>> +//    }
>>>>>>
>>>>>>
>>>>>>      for (AggregateCall aggCall : aggregate.getAggCallList()) {
>>>>>>
>>>>>>        String name = aggCall.getAggregation().getName();
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Maryann
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org> wrote:
>>>>>>
>>>>>>> Drill's current approach seems adequate for Drill alone but extending
>>>>>>> it to a heterogenous system that includes Phoenix seems like a hack.
>>>>>>>
>>>>>>> I think you should only create Prels for algebra nodes that you know
>>>>>>> for sure are going to run on the Drill engine. If there's a
>>>>>>> possibility that it would run in another engine such as Phoenix then
>>>>>>> they should still be logical.
>>>>>>>
>>>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <ma...@gmail.com>
>>>>>>> wrote:
>>>>>>> > The partial aggregate seems to be working now, with one interface
>>>>>>> extension
>>>>>>> > and one bug fix in the Phoenix project. Will do some code cleanup
>>>>>>> and
>>>>>>> > create a pull request soon.
>>>>>>> >
>>>>>>> > Still there was a hack in the Drill project which I made to force
>>>>>>> 2-phase
>>>>>>> > aggregation. I'll try to fix that.
>>>>>>> >
>>>>>>> > Jacques, I have one question though, how can I verify that there
>>>>>>> are more
>>>>>>> > than one slice and the shuffle happens?
>>>>>>> >
>>>>>>> >
>>>>>>> > Thanks,
>>>>>>> > Maryann
>>>>>>> >
>>>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <
>>>>>>> jamestaylor@apache.org> wrote:
>>>>>>> >
>>>>>>> >> Maryann,
>>>>>>> >> I believe Jacques mentioned that a little bit of refactoring is
>>>>>>> required
>>>>>>> >> for a merge sort to occur - there's something that does that, but
>>>>>>> it's not
>>>>>>> >> expected to be used in this context currently.
>>>>>>> >>
>>>>>>> >> IMHO, there's more of a clear value in getting the aggregation to
>>>>>>> use
>>>>>>> >> Phoenix first, so I'd recommend going down that road as Jacques
>>>>>>> mentioned
>>>>>>> >> above if possible. Once that's working, we can circle back to the
>>>>>>> partial
>>>>>>> >> sort.
>>>>>>> >>
>>>>>>> >> Thoughts?
>>>>>>> >> James
>>>>>>> >>
>>>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <
>>>>>>> maryann.xue@gmail.com>
>>>>>>> >> wrote:
>>>>>>> >>
>>>>>>> >>> I actually tried implementing partial sort with
>>>>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured
>>>>>>> might be a
>>>>>>> >>> little easier to start with than partial aggregation. But I
>>>>>>> found that even
>>>>>>> >>> though the code worked (returned the right results), the Drill
>>>>>>> side sort
>>>>>>> >>> turned out to be a ordinary sort instead of a merge which it
>>>>>>> should have
>>>>>>> >>> been. Any idea of how to fix that?
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> Thanks,
>>>>>>> >>> Maryann
>>>>>>> >>>
>>>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <
>>>>>>> jacques@dremio.com>
>>>>>>> >>> wrote:
>>>>>>> >>>
>>>>>>> >>>> Right now this type of work is done here:
>>>>>>> >>>>
>>>>>>> >>>>
>>>>>>> >>>>
>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>>>>> >>>>
>>>>>>> >>>>
>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>> >>>>
>>>>>>> >>>> With Distribution Trait application here:
>>>>>>> >>>>
>>>>>>> >>>>
>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>>>>> >>>>
>>>>>>> >>>> To me, the easiest way to solve the Phoenix issue is by
>>>>>>> providing a rule
>>>>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix
>>>>>>> convention as
>>>>>>> >>>> input. It would replace everywhere but would only be plannable
>>>>>>> when it is
>>>>>>> >>>> the first phase of aggregation.
>>>>>>> >>>>
>>>>>>> >>>> Thoughts?
>>>>>>> >>>>
>>>>>>> >>>>
>>>>>>> >>>>
>>>>>>> >>>> --
>>>>>>> >>>> Jacques Nadeau
>>>>>>> >>>> CTO and Co-Founder, Dremio
>>>>>>> >>>>
>>>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org>
>>>>>>> wrote:
>>>>>>> >>>>
>>>>>>> >>>>> Phoenix is able to perform quite a few relational operations
>>>>>>> on the
>>>>>>> >>>>> region server: scan, filter, project, aggregate, sort
>>>>>>> (optionally with
>>>>>>> >>>>> limit). However, the sort and aggregate are necessarily
>>>>>>> "local". They
>>>>>>> >>>>> can only deal with data on that region server, and there needs
>>>>>>> to be a
>>>>>>> >>>>> further operation to combine the results from the region
>>>>>>> servers.
>>>>>>> >>>>>
>>>>>>> >>>>> The question is how to plan such queries. I think the answer
>>>>>>> is an
>>>>>>> >>>>> AggregateExchangeTransposeRule.
>>>>>>> >>>>>
>>>>>>> >>>>> The rule would spot an Aggregate on a data source that is
>>>>>>> split into
>>>>>>> >>>>> multiple locations (partitions) and split it into a partial
>>>>>>> Aggregate
>>>>>>> >>>>> that computes sub-totals and a summarizing Aggregate that
>>>>>>> combines
>>>>>>> >>>>> those totals.
>>>>>>> >>>>>
>>>>>>> >>>>> How does the planner know that the Aggregate needs to be
>>>>>>> split? Since
>>>>>>> >>>>> the data's distribution has changed, there would need to be an
>>>>>>> >>>>> Exchange operator. It is the Exchange operator that triggers
>>>>>>> the rule
>>>>>>> >>>>> to fire.
>>>>>>> >>>>>
>>>>>>> >>>>> There are some special cases. If the data is sorted as well as
>>>>>>> >>>>> partitioned (say because the local aggregate uses a sort-based
>>>>>>> >>>>> algorithm) we could maybe use a more efficient plan. And if the
>>>>>>> >>>>> partition key is the same as the aggregation key we don't need
>>>>>>> a
>>>>>>> >>>>> summarizing Aggregate, just a Union.
>>>>>>> >>>>>
>>>>>>> >>>>> It turns out not to be very Phoenix-specific. In the
>>>>>>> Drill-on-Phoenix
>>>>>>> >>>>> scenario, once the Aggregate has been pushed through the
>>>>>>> Exchange
>>>>>>> >>>>> (i.e. onto the drill-bit residing on the region server) we can
>>>>>>> then
>>>>>>> >>>>> push the DrillAggregate across the drill-to-phoenix membrane
>>>>>>> and make
>>>>>>> >>>>> it into a PhoenixServerAggregate that executes in the region
>>>>>>> server.
>>>>>>> >>>>>
>>>>>>> >>>>> Related issues:
>>>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>>>>> >>>>>
>>>>>>> >>>>> Julian
>>>>>>> >>>>>
>>>>>>> >>>>
>>>>>>> >>>>
>>>>>>> >>>
>>>>>>> >>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Partial aggregation in Drill-on-Phoenix

Posted by Maryann Xue <ma...@gmail.com>.
One thing from what I asked James offline yesterday, and maybe we can
discuss a little bit in today's meeting:

Phoenix uses a list of lists of Scan objects to indicate Region boundaries
and guideposts, and if the top-level list contains more than one element it
means that the results from different Scanner/ResultIterator should be
merge-sorted. We now use this list in Drill integration to generate
different batches or slices. I see from the Drill plan of a simple select
like "SELECT * FROM A.BEER" that a Drill Sort node sits on top of the
PhoenixTableScan. I guess this is a real sort rather than a merge-sort.
So optimally,
1) this should be a merge-sort (to be more accurate, a merge)
2) furthermore, if Drill has something to indicate the order among slices
and batches, we could even turn it into a concat.

The structure of this Scan list might be helpful for 2), or we may have
some Logical representation for this. Otherwise, we can simply flatten this
list to a one-dimensional list as we do now (in my ci yesterday).



Thanks,
Maryann

On Tue, Oct 6, 2015 at 9:52 PM, Maryann Xue <ma...@gmail.com> wrote:

> Yes, but the partially aggregated results will not contain any duplicate
> rowkeys, since they are also group-by keys. What we need is the aggregators
> and call aggregate for each row. We can write a new simpler ResultIterator
> to replace this, but for now it should work correctly.
>
> On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <ja...@apache.org>
> wrote:
>
>> The results we get back from the server-side scan are already the partial
>> aggregated values we need. GroupedAggregatingResultIterator will
>> collapse adjacent Tuples together which happen to have the same row key.
>> I'm not sure we want/need this to happen. Instead I think we just need to
>> decode the aggregated values directly from the result of the scan.
>>
>> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <ma...@gmail.com>
>> wrote:
>>
>>> Hi James,
>>>
>>> bq. A few questions for you: not sure I understand the changes you made
>>> to PhoenixRecordReader. Is it necessary to wrap the server-side scan
>>> results in a GroupedAggregatingResultIterator? Each server-side scan will
>>> produce results with a single tuple per group by key. In Phoenix, the
>>> GroupedAggregatingResultIterator's function in life is to do the final
>>> merge. Note too that the results aren't sorted that come back from the
>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>> by the group by key). Or is this just to help in decoding the values coming
>>> back from the scan?
>>>
>>> It is necessary. I suppose what we should return as a partial result
>>> from PhoenixRecordReader is exactly the same as what we do in standalone
>>> Phoenix+Calcite, except that the result is partial or say incomplete. For
>>> example, we have "select a, count(*) from t group by a", we should return
>>> rows that have "a" as the first expression value, and "count(*)" as the
>>> second expression value. For this "count" expression, it actually needs a
>>> ClientAggregator for evaluation, and that's what this
>>> GroupedAggregatingResultIterator is used for.
>>> Since "each server-side scan will produce results with a single tuple
>>> per group by key", and PhoenixRecordReader is only dealing with one
>>> server-side result each time, we don't care how the group-by keys are
>>> arranged (ordered or unordered"). Actually
>>> GroupedAggregatingResultIterator is not the group-by iterator we use
>>> for AggregatePlan. It does not "combine". It treats every row as a
>>> different group, by returning its rowkey as the group-by key (
>>> GroupedAggregatingResultIterator.java:56).
>>>
>>> In short, this iterator is for decoding the server-side values. So we
>>> may want to optimize this logic by removing this serialization and
>>> deserialization and having only one set of aggregators in future.
>>>
>>> bq. Also, not sure what impact it has in the way we "combine" the scans
>>> in our Drill parallelization code (PhoenixGroupScan.applyAssignments()),
>>> as each of our scans could include duplicate group by keys. Is it ok to
>>> combine them in this case?
>>>
>>> It should not matter, or at least is not related to the problem I'm now
>>> having.
>>>
>>> bq. One more question: how is the group by key communicated back to
>>> Drill?
>>>
>>> According to the HashAggPrule, if it decides to create a two-phase
>>> aggregate, the first phase is now handled by Phoenix (after applying the
>>> PhoenixHashAggPrule). I assume then the partial results gets shuffled based
>>> on the hash of their group-by keys (returned by PhoenixRecordReader). The
>>> final step is the Drill hash aggregation.
>>>
>>>
>>> This is my test table "A.BEER", which has for columns: "B", "E1", "E2",
>>> "R", all of INTEGER types. And the data is generated like this:
>>> for (x=1 to N) { //currently N=1000
>>>  UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
>>> }
>>>
>>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer GROUP
>>> BY e1".
>>> The expected result should be:
>>> 0 100
>>> 1 100
>>> 2 100
>>> 3 100
>>> 4 100
>>> 5 100
>>> 6 100
>>> 7 100
>>> 8 100
>>> 9 100
>>> The actual result was:
>>> 6 0
>>> 7 0
>>> 8 0
>>> 9 0
>>> 0 0
>>> 1 100
>>> 2 100
>>> 3 100
>>> 4 100
>>> 5 100
>>>
>>> Here I just tried another one "SELECT e2, count(*) FROM a.beer GROUP BY
>>> e2".
>>> Similarly, the expected result should have group-by keys from 0 to 99,
>>> each having a value of 10 as the count, while the actual result was:
>>> from group-by key 86 to 99, together with 0, their count values were all
>>> 0; the rest (1 to 85) all had the correct value 10.
>>>
>>> Looks to me that the scans were good but there was a problem with one of
>>> the hash buckets.
>>>
>>> Thanks,
>>> Maryann
>>>
>>>
>>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <ja...@apache.org>
>>> wrote:
>>>
>>>> Nice progress, Maryann.
>>>>
>>>> A few questions for you: not sure I understand the changes you made to
>>>> PhoenixRecordReader. Is it necessary to wrap the server-side scan results
>>>> in a GroupedAggregatingResultIterator? Each server-side scan will produce
>>>> results with a single tuple per group by key. In Phoenix, the
>>>> GroupedAggregatingResultIterator's function in life is to do the final
>>>> merge. Note too that the results aren't sorted that come back from the
>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>>> by the group by key). Or is this just to help in decoding the values coming
>>>> back from the scan?
>>>>
>>>> Also, not sure what impact it has in the way we "combine" the scans in
>>>> our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as
>>>> each of our scans could include duplicate group by keys. Is it ok to
>>>> combine them in this case?
>>>>
>>>> One more question: how is the group by key communicated back to Drill?
>>>>
>>>> Thanks,
>>>> James
>>>>
>>>>
>>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <ma...@gmail.com>
>>>> wrote:
>>>>
>>>>> Added a few fixes in the pull request. Tested with two regions, turned
>>>>> out that half of the result is empty (count = 0).
>>>>> Not sure if there's anything wrong with
>>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>>>>> .
>>>>> Like Julian said, this rule looks a bit hacky.
>>>>>
>>>>> To force a 2-phase HashAgg, I made a temporary change as well:
>>>>>
>>>>> diff --git
>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>
>>>>> index b911f6b..58bc918 100644
>>>>>
>>>>> ---
>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>
>>>>> +++
>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>
>>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends Prule {
>>>>>
>>>>>    // If any of the aggregate functions are not one of these, then we
>>>>>
>>>>>    // currently won't generate a 2 phase plan.
>>>>>
>>>>>    protected boolean create2PhasePlan(RelOptRuleCall call,
>>>>> DrillAggregateRel aggregate) {
>>>>>
>>>>> -    PlannerSettings settings =
>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>
>>>>> -    RelNode child = call.rel(0).getInputs().get(0);
>>>>>
>>>>> -    boolean smallInput = child.getRows() < settings.getSliceTarget();
>>>>>
>>>>> -    if (! settings.isMultiPhaseAggEnabled() ||
>>>>> settings.isSingleMode() || smallInput) {
>>>>>
>>>>> -      return false;
>>>>>
>>>>> -    }
>>>>>
>>>>> +//    PlannerSettings settings =
>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>
>>>>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>>>>
>>>>> +//    boolean smallInput = child.getRows() <
>>>>> settings.getSliceTarget();
>>>>>
>>>>> +//    if (! settings.isMultiPhaseAggEnabled() ||
>>>>> settings.isSingleMode() || smallInput) {
>>>>>
>>>>> +//      return false;
>>>>>
>>>>> +//    }
>>>>>
>>>>>
>>>>>      for (AggregateCall aggCall : aggregate.getAggCallList()) {
>>>>>
>>>>>        String name = aggCall.getAggregation().getName();
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Maryann
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org> wrote:
>>>>>
>>>>>> Drill's current approach seems adequate for Drill alone but extending
>>>>>> it to a heterogenous system that includes Phoenix seems like a hack.
>>>>>>
>>>>>> I think you should only create Prels for algebra nodes that you know
>>>>>> for sure are going to run on the Drill engine. If there's a
>>>>>> possibility that it would run in another engine such as Phoenix then
>>>>>> they should still be logical.
>>>>>>
>>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <ma...@gmail.com>
>>>>>> wrote:
>>>>>> > The partial aggregate seems to be working now, with one interface
>>>>>> extension
>>>>>> > and one bug fix in the Phoenix project. Will do some code cleanup
>>>>>> and
>>>>>> > create a pull request soon.
>>>>>> >
>>>>>> > Still there was a hack in the Drill project which I made to force
>>>>>> 2-phase
>>>>>> > aggregation. I'll try to fix that.
>>>>>> >
>>>>>> > Jacques, I have one question though, how can I verify that there
>>>>>> are more
>>>>>> > than one slice and the shuffle happens?
>>>>>> >
>>>>>> >
>>>>>> > Thanks,
>>>>>> > Maryann
>>>>>> >
>>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <
>>>>>> jamestaylor@apache.org> wrote:
>>>>>> >
>>>>>> >> Maryann,
>>>>>> >> I believe Jacques mentioned that a little bit of refactoring is
>>>>>> required
>>>>>> >> for a merge sort to occur - there's something that does that, but
>>>>>> it's not
>>>>>> >> expected to be used in this context currently.
>>>>>> >>
>>>>>> >> IMHO, there's more of a clear value in getting the aggregation to
>>>>>> use
>>>>>> >> Phoenix first, so I'd recommend going down that road as Jacques
>>>>>> mentioned
>>>>>> >> above if possible. Once that's working, we can circle back to the
>>>>>> partial
>>>>>> >> sort.
>>>>>> >>
>>>>>> >> Thoughts?
>>>>>> >> James
>>>>>> >>
>>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <
>>>>>> maryann.xue@gmail.com>
>>>>>> >> wrote:
>>>>>> >>
>>>>>> >>> I actually tried implementing partial sort with
>>>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured might
>>>>>> be a
>>>>>> >>> little easier to start with than partial aggregation. But I found
>>>>>> that even
>>>>>> >>> though the code worked (returned the right results), the Drill
>>>>>> side sort
>>>>>> >>> turned out to be a ordinary sort instead of a merge which it
>>>>>> should have
>>>>>> >>> been. Any idea of how to fix that?
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Thanks,
>>>>>> >>> Maryann
>>>>>> >>>
>>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <
>>>>>> jacques@dremio.com>
>>>>>> >>> wrote:
>>>>>> >>>
>>>>>> >>>> Right now this type of work is done here:
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>>
>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>>>> >>>>
>>>>>> >>>>
>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>> >>>>
>>>>>> >>>> With Distribution Trait application here:
>>>>>> >>>>
>>>>>> >>>>
>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>>>> >>>>
>>>>>> >>>> To me, the easiest way to solve the Phoenix issue is by
>>>>>> providing a rule
>>>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix
>>>>>> convention as
>>>>>> >>>> input. It would replace everywhere but would only be plannable
>>>>>> when it is
>>>>>> >>>> the first phase of aggregation.
>>>>>> >>>>
>>>>>> >>>> Thoughts?
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>> --
>>>>>> >>>> Jacques Nadeau
>>>>>> >>>> CTO and Co-Founder, Dremio
>>>>>> >>>>
>>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org>
>>>>>> wrote:
>>>>>> >>>>
>>>>>> >>>>> Phoenix is able to perform quite a few relational operations on
>>>>>> the
>>>>>> >>>>> region server: scan, filter, project, aggregate, sort
>>>>>> (optionally with
>>>>>> >>>>> limit). However, the sort and aggregate are necessarily
>>>>>> "local". They
>>>>>> >>>>> can only deal with data on that region server, and there needs
>>>>>> to be a
>>>>>> >>>>> further operation to combine the results from the region
>>>>>> servers.
>>>>>> >>>>>
>>>>>> >>>>> The question is how to plan such queries. I think the answer is
>>>>>> an
>>>>>> >>>>> AggregateExchangeTransposeRule.
>>>>>> >>>>>
>>>>>> >>>>> The rule would spot an Aggregate on a data source that is split
>>>>>> into
>>>>>> >>>>> multiple locations (partitions) and split it into a partial
>>>>>> Aggregate
>>>>>> >>>>> that computes sub-totals and a summarizing Aggregate that
>>>>>> combines
>>>>>> >>>>> those totals.
>>>>>> >>>>>
>>>>>> >>>>> How does the planner know that the Aggregate needs to be split?
>>>>>> Since
>>>>>> >>>>> the data's distribution has changed, there would need to be an
>>>>>> >>>>> Exchange operator. It is the Exchange operator that triggers
>>>>>> the rule
>>>>>> >>>>> to fire.
>>>>>> >>>>>
>>>>>> >>>>> There are some special cases. If the data is sorted as well as
>>>>>> >>>>> partitioned (say because the local aggregate uses a sort-based
>>>>>> >>>>> algorithm) we could maybe use a more efficient plan. And if the
>>>>>> >>>>> partition key is the same as the aggregation key we don't need a
>>>>>> >>>>> summarizing Aggregate, just a Union.
>>>>>> >>>>>
>>>>>> >>>>> It turns out not to be very Phoenix-specific. In the
>>>>>> Drill-on-Phoenix
>>>>>> >>>>> scenario, once the Aggregate has been pushed through the
>>>>>> Exchange
>>>>>> >>>>> (i.e. onto the drill-bit residing on the region server) we can
>>>>>> then
>>>>>> >>>>> push the DrillAggregate across the drill-to-phoenix membrane
>>>>>> and make
>>>>>> >>>>> it into a PhoenixServerAggregate that executes in the region
>>>>>> server.
>>>>>> >>>>>
>>>>>> >>>>> Related issues:
>>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>>>> >>>>>
>>>>>> >>>>> Julian
>>>>>> >>>>>
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>
>>>>>> >>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Partial aggregation in Drill-on-Phoenix

Posted by Maryann Xue <ma...@gmail.com>.
Yes, but the partially aggregated results will not contain any duplicate
rowkeys, since they are also group-by keys. What we need is the aggregators
and call aggregate for each row. We can write a new simpler ResultIterator
to replace this, but for now it should work correctly.

On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <ja...@apache.org> wrote:

> The results we get back from the server-side scan are already the partial
> aggregated values we need. GroupedAggregatingResultIterator will collapse
> adjacent Tuples together which happen to have the same row key. I'm not
> sure we want/need this to happen. Instead I think we just need to decode
> the aggregated values directly from the result of the scan.
>
> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <ma...@gmail.com> wrote:
>
>> Hi James,
>>
>> bq. A few questions for you: not sure I understand the changes you made
>> to PhoenixRecordReader. Is it necessary to wrap the server-side scan
>> results in a GroupedAggregatingResultIterator? Each server-side scan will
>> produce results with a single tuple per group by key. In Phoenix, the
>> GroupedAggregatingResultIterator's function in life is to do the final
>> merge. Note too that the results aren't sorted that come back from the
>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>> by the group by key). Or is this just to help in decoding the values coming
>> back from the scan?
>>
>> It is necessary. I suppose what we should return as a partial result from
>> PhoenixRecordReader is exactly the same as what we do in standalone
>> Phoenix+Calcite, except that the result is partial or say incomplete. For
>> example, we have "select a, count(*) from t group by a", we should return
>> rows that have "a" as the first expression value, and "count(*)" as the
>> second expression value. For this "count" expression, it actually needs a
>> ClientAggregator for evaluation, and that's what this
>> GroupedAggregatingResultIterator is used for.
>> Since "each server-side scan will produce results with a single tuple
>> per group by key", and PhoenixRecordReader is only dealing with one
>> server-side result each time, we don't care how the group-by keys are
>> arranged (ordered or unordered"). Actually GroupedAggregatingResultIterator
>> is not the group-by iterator we use for AggregatePlan. It does not
>> "combine". It treats every row as a different group, by returning its
>> rowkey as the group-by key (GroupedAggregatingResultIterator.java:56).
>>
>> In short, this iterator is for decoding the server-side values. So we may
>> want to optimize this logic by removing this serialization and
>> deserialization and having only one set of aggregators in future.
>>
>> bq. Also, not sure what impact it has in the way we "combine" the scans
>> in our Drill parallelization code (PhoenixGroupScan.applyAssignments()),
>> as each of our scans could include duplicate group by keys. Is it ok to
>> combine them in this case?
>>
>> It should not matter, or at least is not related to the problem I'm now
>> having.
>>
>> bq. One more question: how is the group by key communicated back to Drill?
>>
>> According to the HashAggPrule, if it decides to create a two-phase
>> aggregate, the first phase is now handled by Phoenix (after applying the
>> PhoenixHashAggPrule). I assume then the partial results gets shuffled based
>> on the hash of their group-by keys (returned by PhoenixRecordReader). The
>> final step is the Drill hash aggregation.
>>
>>
>> This is my test table "A.BEER", which has for columns: "B", "E1", "E2",
>> "R", all of INTEGER types. And the data is generated like this:
>> for (x=1 to N) { //currently N=1000
>>  UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
>> }
>>
>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer GROUP
>> BY e1".
>> The expected result should be:
>> 0 100
>> 1 100
>> 2 100
>> 3 100
>> 4 100
>> 5 100
>> 6 100
>> 7 100
>> 8 100
>> 9 100
>> The actual result was:
>> 6 0
>> 7 0
>> 8 0
>> 9 0
>> 0 0
>> 1 100
>> 2 100
>> 3 100
>> 4 100
>> 5 100
>>
>> Here I just tried another one "SELECT e2, count(*) FROM a.beer GROUP BY
>> e2".
>> Similarly, the expected result should have group-by keys from 0 to 99,
>> each having a value of 10 as the count, while the actual result was:
>> from group-by key 86 to 99, together with 0, their count values were all
>> 0; the rest (1 to 85) all had the correct value 10.
>>
>> Looks to me that the scans were good but there was a problem with one of
>> the hash buckets.
>>
>> Thanks,
>> Maryann
>>
>>
>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <ja...@apache.org>
>> wrote:
>>
>>> Nice progress, Maryann.
>>>
>>> A few questions for you: not sure I understand the changes you made to
>>> PhoenixRecordReader. Is it necessary to wrap the server-side scan results
>>> in a GroupedAggregatingResultIterator? Each server-side scan will produce
>>> results with a single tuple per group by key. In Phoenix, the
>>> GroupedAggregatingResultIterator's function in life is to do the final
>>> merge. Note too that the results aren't sorted that come back from the
>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>> by the group by key). Or is this just to help in decoding the values coming
>>> back from the scan?
>>>
>>> Also, not sure what impact it has in the way we "combine" the scans in
>>> our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as
>>> each of our scans could include duplicate group by keys. Is it ok to
>>> combine them in this case?
>>>
>>> One more question: how is the group by key communicated back to Drill?
>>>
>>> Thanks,
>>> James
>>>
>>>
>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <ma...@gmail.com>
>>> wrote:
>>>
>>>> Added a few fixes in the pull request. Tested with two regions, turned
>>>> out that half of the result is empty (count = 0).
>>>> Not sure if there's anything wrong with
>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>>>> .
>>>> Like Julian said, this rule looks a bit hacky.
>>>>
>>>> To force a 2-phase HashAgg, I made a temporary change as well:
>>>>
>>>> diff --git
>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>
>>>> index b911f6b..58bc918 100644
>>>>
>>>> ---
>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>
>>>> +++
>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>
>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends Prule {
>>>>
>>>>    // If any of the aggregate functions are not one of these, then we
>>>>
>>>>    // currently won't generate a 2 phase plan.
>>>>
>>>>    protected boolean create2PhasePlan(RelOptRuleCall call,
>>>> DrillAggregateRel aggregate) {
>>>>
>>>> -    PlannerSettings settings =
>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>
>>>> -    RelNode child = call.rel(0).getInputs().get(0);
>>>>
>>>> -    boolean smallInput = child.getRows() < settings.getSliceTarget();
>>>>
>>>> -    if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode()
>>>> || smallInput) {
>>>>
>>>> -      return false;
>>>>
>>>> -    }
>>>>
>>>> +//    PlannerSettings settings =
>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>
>>>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>>>
>>>> +//    boolean smallInput = child.getRows() < settings.getSliceTarget();
>>>>
>>>> +//    if (! settings.isMultiPhaseAggEnabled() ||
>>>> settings.isSingleMode() || smallInput) {
>>>>
>>>> +//      return false;
>>>>
>>>> +//    }
>>>>
>>>>
>>>>      for (AggregateCall aggCall : aggregate.getAggCallList()) {
>>>>
>>>>        String name = aggCall.getAggregation().getName();
>>>>
>>>>
>>>> Thanks,
>>>> Maryann
>>>>
>>>>
>>>>
>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org> wrote:
>>>>
>>>>> Drill's current approach seems adequate for Drill alone but extending
>>>>> it to a heterogenous system that includes Phoenix seems like a hack.
>>>>>
>>>>> I think you should only create Prels for algebra nodes that you know
>>>>> for sure are going to run on the Drill engine. If there's a
>>>>> possibility that it would run in another engine such as Phoenix then
>>>>> they should still be logical.
>>>>>
>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <ma...@gmail.com>
>>>>> wrote:
>>>>> > The partial aggregate seems to be working now, with one interface
>>>>> extension
>>>>> > and one bug fix in the Phoenix project. Will do some code cleanup and
>>>>> > create a pull request soon.
>>>>> >
>>>>> > Still there was a hack in the Drill project which I made to force
>>>>> 2-phase
>>>>> > aggregation. I'll try to fix that.
>>>>> >
>>>>> > Jacques, I have one question though, how can I verify that there are
>>>>> more
>>>>> > than one slice and the shuffle happens?
>>>>> >
>>>>> >
>>>>> > Thanks,
>>>>> > Maryann
>>>>> >
>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <ja...@apache.org>
>>>>> wrote:
>>>>> >
>>>>> >> Maryann,
>>>>> >> I believe Jacques mentioned that a little bit of refactoring is
>>>>> required
>>>>> >> for a merge sort to occur - there's something that does that, but
>>>>> it's not
>>>>> >> expected to be used in this context currently.
>>>>> >>
>>>>> >> IMHO, there's more of a clear value in getting the aggregation to
>>>>> use
>>>>> >> Phoenix first, so I'd recommend going down that road as Jacques
>>>>> mentioned
>>>>> >> above if possible. Once that's working, we can circle back to the
>>>>> partial
>>>>> >> sort.
>>>>> >>
>>>>> >> Thoughts?
>>>>> >> James
>>>>> >>
>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <maryann.xue@gmail.com
>>>>> >
>>>>> >> wrote:
>>>>> >>
>>>>> >>> I actually tried implementing partial sort with
>>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured might
>>>>> be a
>>>>> >>> little easier to start with than partial aggregation. But I found
>>>>> that even
>>>>> >>> though the code worked (returned the right results), the Drill
>>>>> side sort
>>>>> >>> turned out to be a ordinary sort instead of a merge which it
>>>>> should have
>>>>> >>> been. Any idea of how to fix that?
>>>>> >>>
>>>>> >>>
>>>>> >>> Thanks,
>>>>> >>> Maryann
>>>>> >>>
>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <
>>>>> jacques@dremio.com>
>>>>> >>> wrote:
>>>>> >>>
>>>>> >>>> Right now this type of work is done here:
>>>>> >>>>
>>>>> >>>>
>>>>> >>>>
>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>>> >>>>
>>>>> >>>>
>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>> >>>>
>>>>> >>>> With Distribution Trait application here:
>>>>> >>>>
>>>>> >>>>
>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>>> >>>>
>>>>> >>>> To me, the easiest way to solve the Phoenix issue is by providing
>>>>> a rule
>>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix
>>>>> convention as
>>>>> >>>> input. It would replace everywhere but would only be plannable
>>>>> when it is
>>>>> >>>> the first phase of aggregation.
>>>>> >>>>
>>>>> >>>> Thoughts?
>>>>> >>>>
>>>>> >>>>
>>>>> >>>>
>>>>> >>>> --
>>>>> >>>> Jacques Nadeau
>>>>> >>>> CTO and Co-Founder, Dremio
>>>>> >>>>
>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org>
>>>>> wrote:
>>>>> >>>>
>>>>> >>>>> Phoenix is able to perform quite a few relational operations on
>>>>> the
>>>>> >>>>> region server: scan, filter, project, aggregate, sort
>>>>> (optionally with
>>>>> >>>>> limit). However, the sort and aggregate are necessarily "local".
>>>>> They
>>>>> >>>>> can only deal with data on that region server, and there needs
>>>>> to be a
>>>>> >>>>> further operation to combine the results from the region servers.
>>>>> >>>>>
>>>>> >>>>> The question is how to plan such queries. I think the answer is
>>>>> an
>>>>> >>>>> AggregateExchangeTransposeRule.
>>>>> >>>>>
>>>>> >>>>> The rule would spot an Aggregate on a data source that is split
>>>>> into
>>>>> >>>>> multiple locations (partitions) and split it into a partial
>>>>> Aggregate
>>>>> >>>>> that computes sub-totals and a summarizing Aggregate that
>>>>> combines
>>>>> >>>>> those totals.
>>>>> >>>>>
>>>>> >>>>> How does the planner know that the Aggregate needs to be split?
>>>>> Since
>>>>> >>>>> the data's distribution has changed, there would need to be an
>>>>> >>>>> Exchange operator. It is the Exchange operator that triggers the
>>>>> rule
>>>>> >>>>> to fire.
>>>>> >>>>>
>>>>> >>>>> There are some special cases. If the data is sorted as well as
>>>>> >>>>> partitioned (say because the local aggregate uses a sort-based
>>>>> >>>>> algorithm) we could maybe use a more efficient plan. And if the
>>>>> >>>>> partition key is the same as the aggregation key we don't need a
>>>>> >>>>> summarizing Aggregate, just a Union.
>>>>> >>>>>
>>>>> >>>>> It turns out not to be very Phoenix-specific. In the
>>>>> Drill-on-Phoenix
>>>>> >>>>> scenario, once the Aggregate has been pushed through the Exchange
>>>>> >>>>> (i.e. onto the drill-bit residing on the region server) we can
>>>>> then
>>>>> >>>>> push the DrillAggregate across the drill-to-phoenix membrane and
>>>>> make
>>>>> >>>>> it into a PhoenixServerAggregate that executes in the region
>>>>> server.
>>>>> >>>>>
>>>>> >>>>> Related issues:
>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>>> >>>>>
>>>>> >>>>> Julian
>>>>> >>>>>
>>>>> >>>>
>>>>> >>>>
>>>>> >>>
>>>>> >>
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Partial aggregation in Drill-on-Phoenix

Posted by James Taylor <ja...@apache.org>.
The results we get back from the server-side scan are already the partial
aggregated values we need. GroupedAggregatingResultIterator will collapse
adjacent Tuples together which happen to have the same row key. I'm not
sure we want/need this to happen. Instead I think we just need to decode
the aggregated values directly from the result of the scan.

On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <ma...@gmail.com> wrote:

> Hi James,
>
> bq. A few questions for you: not sure I understand the changes you made to
> PhoenixRecordReader. Is it necessary to wrap the server-side scan results
> in a GroupedAggregatingResultIterator? Each server-side scan will produce
> results with a single tuple per group by key. In Phoenix, the
> GroupedAggregatingResultIterator's function in life is to do the final
> merge. Note too that the results aren't sorted that come back from the
> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
> by the group by key). Or is this just to help in decoding the values coming
> back from the scan?
>
> It is necessary. I suppose what we should return as a partial result from
> PhoenixRecordReader is exactly the same as what we do in standalone
> Phoenix+Calcite, except that the result is partial or say incomplete. For
> example, we have "select a, count(*) from t group by a", we should return
> rows that have "a" as the first expression value, and "count(*)" as the
> second expression value. For this "count" expression, it actually needs a
> ClientAggregator for evaluation, and that's what this
> GroupedAggregatingResultIterator is used for.
> Since "each server-side scan will produce results with a single tuple per
> group by key", and PhoenixRecordReader is only dealing with one server-side
> result each time, we don't care how the group-by keys are arranged (ordered
> or unordered"). Actually GroupedAggregatingResultIterator is not the
> group-by iterator we use for AggregatePlan. It does not "combine". It
> treats every row as a different group, by returning its rowkey as the
> group-by key (GroupedAggregatingResultIterator.java:56).
>
> In short, this iterator is for decoding the server-side values. So we may
> want to optimize this logic by removing this serialization and
> deserialization and having only one set of aggregators in future.
>
> bq. Also, not sure what impact it has in the way we "combine" the scans in
> our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as
> each of our scans could include duplicate group by keys. Is it ok to
> combine them in this case?
>
> It should not matter, or at least is not related to the problem I'm now
> having.
>
> bq. One more question: how is the group by key communicated back to Drill?
>
> According to the HashAggPrule, if it decides to create a two-phase
> aggregate, the first phase is now handled by Phoenix (after applying the
> PhoenixHashAggPrule). I assume then the partial results gets shuffled based
> on the hash of their group-by keys (returned by PhoenixRecordReader). The
> final step is the Drill hash aggregation.
>
>
> This is my test table "A.BEER", which has for columns: "B", "E1", "E2",
> "R", all of INTEGER types. And the data is generated like this:
> for (x=1 to N) { //currently N=1000
>  UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
> }
>
> The group-by query for testing is "SELECT e1, count(*) FROM a.beer GROUP
> BY e1".
> The expected result should be:
> 0 100
> 1 100
> 2 100
> 3 100
> 4 100
> 5 100
> 6 100
> 7 100
> 8 100
> 9 100
> The actual result was:
> 6 0
> 7 0
> 8 0
> 9 0
> 0 0
> 1 100
> 2 100
> 3 100
> 4 100
> 5 100
>
> Here I just tried another one "SELECT e2, count(*) FROM a.beer GROUP BY
> e2".
> Similarly, the expected result should have group-by keys from 0 to 99,
> each having a value of 10 as the count, while the actual result was:
> from group-by key 86 to 99, together with 0, their count values were all
> 0; the rest (1 to 85) all had the correct value 10.
>
> Looks to me that the scans were good but there was a problem with one of
> the hash buckets.
>
> Thanks,
> Maryann
>
>
> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <ja...@apache.org>
> wrote:
>
>> Nice progress, Maryann.
>>
>> A few questions for you: not sure I understand the changes you made to
>> PhoenixRecordReader. Is it necessary to wrap the server-side scan results
>> in a GroupedAggregatingResultIterator? Each server-side scan will produce
>> results with a single tuple per group by key. In Phoenix, the
>> GroupedAggregatingResultIterator's function in life is to do the final
>> merge. Note too that the results aren't sorted that come back from the
>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>> by the group by key). Or is this just to help in decoding the values coming
>> back from the scan?
>>
>> Also, not sure what impact it has in the way we "combine" the scans in
>> our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as
>> each of our scans could include duplicate group by keys. Is it ok to
>> combine them in this case?
>>
>> One more question: how is the group by key communicated back to Drill?
>>
>> Thanks,
>> James
>>
>>
>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <ma...@gmail.com>
>> wrote:
>>
>>> Added a few fixes in the pull request. Tested with two regions, turned
>>> out that half of the result is empty (count = 0).
>>> Not sure if there's anything wrong with
>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>>> .
>>> Like Julian said, this rule looks a bit hacky.
>>>
>>> To force a 2-phase HashAgg, I made a temporary change as well:
>>>
>>> diff --git
>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>
>>> index b911f6b..58bc918 100644
>>>
>>> ---
>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>
>>> +++
>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>
>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends Prule {
>>>
>>>    // If any of the aggregate functions are not one of these, then we
>>>
>>>    // currently won't generate a 2 phase plan.
>>>
>>>    protected boolean create2PhasePlan(RelOptRuleCall call,
>>> DrillAggregateRel aggregate) {
>>>
>>> -    PlannerSettings settings =
>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>
>>> -    RelNode child = call.rel(0).getInputs().get(0);
>>>
>>> -    boolean smallInput = child.getRows() < settings.getSliceTarget();
>>>
>>> -    if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode()
>>> || smallInput) {
>>>
>>> -      return false;
>>>
>>> -    }
>>>
>>> +//    PlannerSettings settings =
>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>
>>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>>
>>> +//    boolean smallInput = child.getRows() < settings.getSliceTarget();
>>>
>>> +//    if (! settings.isMultiPhaseAggEnabled() ||
>>> settings.isSingleMode() || smallInput) {
>>>
>>> +//      return false;
>>>
>>> +//    }
>>>
>>>
>>>      for (AggregateCall aggCall : aggregate.getAggCallList()) {
>>>
>>>        String name = aggCall.getAggregation().getName();
>>>
>>>
>>> Thanks,
>>> Maryann
>>>
>>>
>>>
>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org> wrote:
>>>
>>>> Drill's current approach seems adequate for Drill alone but extending
>>>> it to a heterogenous system that includes Phoenix seems like a hack.
>>>>
>>>> I think you should only create Prels for algebra nodes that you know
>>>> for sure are going to run on the Drill engine. If there's a
>>>> possibility that it would run in another engine such as Phoenix then
>>>> they should still be logical.
>>>>
>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <ma...@gmail.com>
>>>> wrote:
>>>> > The partial aggregate seems to be working now, with one interface
>>>> extension
>>>> > and one bug fix in the Phoenix project. Will do some code cleanup and
>>>> > create a pull request soon.
>>>> >
>>>> > Still there was a hack in the Drill project which I made to force
>>>> 2-phase
>>>> > aggregation. I'll try to fix that.
>>>> >
>>>> > Jacques, I have one question though, how can I verify that there are
>>>> more
>>>> > than one slice and the shuffle happens?
>>>> >
>>>> >
>>>> > Thanks,
>>>> > Maryann
>>>> >
>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <ja...@apache.org>
>>>> wrote:
>>>> >
>>>> >> Maryann,
>>>> >> I believe Jacques mentioned that a little bit of refactoring is
>>>> required
>>>> >> for a merge sort to occur - there's something that does that, but
>>>> it's not
>>>> >> expected to be used in this context currently.
>>>> >>
>>>> >> IMHO, there's more of a clear value in getting the aggregation to use
>>>> >> Phoenix first, so I'd recommend going down that road as Jacques
>>>> mentioned
>>>> >> above if possible. Once that's working, we can circle back to the
>>>> partial
>>>> >> sort.
>>>> >>
>>>> >> Thoughts?
>>>> >> James
>>>> >>
>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <ma...@gmail.com>
>>>> >> wrote:
>>>> >>
>>>> >>> I actually tried implementing partial sort with
>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured might
>>>> be a
>>>> >>> little easier to start with than partial aggregation. But I found
>>>> that even
>>>> >>> though the code worked (returned the right results), the Drill side
>>>> sort
>>>> >>> turned out to be a ordinary sort instead of a merge which it should
>>>> have
>>>> >>> been. Any idea of how to fix that?
>>>> >>>
>>>> >>>
>>>> >>> Thanks,
>>>> >>> Maryann
>>>> >>>
>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <jacques@dremio.com
>>>> >
>>>> >>> wrote:
>>>> >>>
>>>> >>>> Right now this type of work is done here:
>>>> >>>>
>>>> >>>>
>>>> >>>>
>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>> >>>>
>>>> >>>>
>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>> >>>>
>>>> >>>> With Distribution Trait application here:
>>>> >>>>
>>>> >>>>
>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>> >>>>
>>>> >>>> To me, the easiest way to solve the Phoenix issue is by providing
>>>> a rule
>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix convention
>>>> as
>>>> >>>> input. It would replace everywhere but would only be plannable
>>>> when it is
>>>> >>>> the first phase of aggregation.
>>>> >>>>
>>>> >>>> Thoughts?
>>>> >>>>
>>>> >>>>
>>>> >>>>
>>>> >>>> --
>>>> >>>> Jacques Nadeau
>>>> >>>> CTO and Co-Founder, Dremio
>>>> >>>>
>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org>
>>>> wrote:
>>>> >>>>
>>>> >>>>> Phoenix is able to perform quite a few relational operations on
>>>> the
>>>> >>>>> region server: scan, filter, project, aggregate, sort (optionally
>>>> with
>>>> >>>>> limit). However, the sort and aggregate are necessarily "local".
>>>> They
>>>> >>>>> can only deal with data on that region server, and there needs to
>>>> be a
>>>> >>>>> further operation to combine the results from the region servers.
>>>> >>>>>
>>>> >>>>> The question is how to plan such queries. I think the answer is an
>>>> >>>>> AggregateExchangeTransposeRule.
>>>> >>>>>
>>>> >>>>> The rule would spot an Aggregate on a data source that is split
>>>> into
>>>> >>>>> multiple locations (partitions) and split it into a partial
>>>> Aggregate
>>>> >>>>> that computes sub-totals and a summarizing Aggregate that combines
>>>> >>>>> those totals.
>>>> >>>>>
>>>> >>>>> How does the planner know that the Aggregate needs to be split?
>>>> Since
>>>> >>>>> the data's distribution has changed, there would need to be an
>>>> >>>>> Exchange operator. It is the Exchange operator that triggers the
>>>> rule
>>>> >>>>> to fire.
>>>> >>>>>
>>>> >>>>> There are some special cases. If the data is sorted as well as
>>>> >>>>> partitioned (say because the local aggregate uses a sort-based
>>>> >>>>> algorithm) we could maybe use a more efficient plan. And if the
>>>> >>>>> partition key is the same as the aggregation key we don't need a
>>>> >>>>> summarizing Aggregate, just a Union.
>>>> >>>>>
>>>> >>>>> It turns out not to be very Phoenix-specific. In the
>>>> Drill-on-Phoenix
>>>> >>>>> scenario, once the Aggregate has been pushed through the Exchange
>>>> >>>>> (i.e. onto the drill-bit residing on the region server) we can
>>>> then
>>>> >>>>> push the DrillAggregate across the drill-to-phoenix membrane and
>>>> make
>>>> >>>>> it into a PhoenixServerAggregate that executes in the region
>>>> server.
>>>> >>>>>
>>>> >>>>> Related issues:
>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>> >>>>>
>>>> >>>>> Julian
>>>> >>>>>
>>>> >>>>
>>>> >>>>
>>>> >>>
>>>> >>
>>>>
>>>
>>>
>>
>

Re: Partial aggregation in Drill-on-Phoenix

Posted by Maryann Xue <ma...@gmail.com>.
Hi James,

bq. A few questions for you: not sure I understand the changes you made to
PhoenixRecordReader. Is it necessary to wrap the server-side scan results
in a GroupedAggregatingResultIterator? Each server-side scan will produce
results with a single tuple per group by key. In Phoenix, the
GroupedAggregatingResultIterator's function in life is to do the final
merge. Note too that the results aren't sorted that come back from the
aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
by the group by key). Or is this just to help in decoding the values coming
back from the scan?

It is necessary. I suppose what we should return as a partial result from
PhoenixRecordReader is exactly the same as what we do in standalone
Phoenix+Calcite, except that the result is partial or say incomplete. For
example, we have "select a, count(*) from t group by a", we should return
rows that have "a" as the first expression value, and "count(*)" as the
second expression value. For this "count" expression, it actually needs a
ClientAggregator for evaluation, and that's what this
GroupedAggregatingResultIterator is used for.
Since "each server-side scan will produce results with a single tuple per
group by key", and PhoenixRecordReader is only dealing with one server-side
result each time, we don't care how the group-by keys are arranged (ordered
or unordered"). Actually GroupedAggregatingResultIterator is not the
group-by iterator we use for AggregatePlan. It does not "combine". It
treats every row as a different group, by returning its rowkey as the
group-by key (GroupedAggregatingResultIterator.java:56).

In short, this iterator is for decoding the server-side values. So we may
want to optimize this logic by removing this serialization and
deserialization and having only one set of aggregators in future.

bq. Also, not sure what impact it has in the way we "combine" the scans in
our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as
each of our scans could include duplicate group by keys. Is it ok to
combine them in this case?

It should not matter, or at least is not related to the problem I'm now
having.

bq. One more question: how is the group by key communicated back to Drill?

According to the HashAggPrule, if it decides to create a two-phase
aggregate, the first phase is now handled by Phoenix (after applying the
PhoenixHashAggPrule). I assume then the partial results gets shuffled based
on the hash of their group-by keys (returned by PhoenixRecordReader). The
final step is the Drill hash aggregation.


This is my test table "A.BEER", which has for columns: "B", "E1", "E2",
"R", all of INTEGER types. And the data is generated like this:
for (x=1 to N) { //currently N=1000
 UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
}

The group-by query for testing is "SELECT e1, count(*) FROM a.beer GROUP BY
e1".
The expected result should be:
0 100
1 100
2 100
3 100
4 100
5 100
6 100
7 100
8 100
9 100
The actual result was:
6 0
7 0
8 0
9 0
0 0
1 100
2 100
3 100
4 100
5 100

Here I just tried another one "SELECT e2, count(*) FROM a.beer GROUP BY e2".
Similarly, the expected result should have group-by keys from 0 to 99, each
having a value of 10 as the count, while the actual result was:
from group-by key 86 to 99, together with 0, their count values were all 0;
the rest (1 to 85) all had the correct value 10.

Looks to me that the scans were good but there was a problem with one of
the hash buckets.

Thanks,
Maryann


On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <ja...@apache.org> wrote:

> Nice progress, Maryann.
>
> A few questions for you: not sure I understand the changes you made to
> PhoenixRecordReader. Is it necessary to wrap the server-side scan results
> in a GroupedAggregatingResultIterator? Each server-side scan will produce
> results with a single tuple per group by key. In Phoenix, the
> GroupedAggregatingResultIterator's function in life is to do the final
> merge. Note too that the results aren't sorted that come back from the
> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
> by the group by key). Or is this just to help in decoding the values coming
> back from the scan?
>
> Also, not sure what impact it has in the way we "combine" the scans in our
> Drill parallelization code (PhoenixGroupScan.applyAssignments()), as each
> of our scans could include duplicate group by keys. Is it ok to combine
> them in this case?
>
> One more question: how is the group by key communicated back to Drill?
>
> Thanks,
> James
>
>
> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <ma...@gmail.com> wrote:
>
>> Added a few fixes in the pull request. Tested with two regions, turned
>> out that half of the result is empty (count = 0).
>> Not sure if there's anything wrong with
>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>> .
>> Like Julian said, this rule looks a bit hacky.
>>
>> To force a 2-phase HashAgg, I made a temporary change as well:
>>
>> diff --git
>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>
>> index b911f6b..58bc918 100644
>>
>> ---
>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>
>> +++
>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>
>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends Prule {
>>
>>    // If any of the aggregate functions are not one of these, then we
>>
>>    // currently won't generate a 2 phase plan.
>>
>>    protected boolean create2PhasePlan(RelOptRuleCall call,
>> DrillAggregateRel aggregate) {
>>
>> -    PlannerSettings settings =
>> PrelUtil.getPlannerSettings(call.getPlanner());
>>
>> -    RelNode child = call.rel(0).getInputs().get(0);
>>
>> -    boolean smallInput = child.getRows() < settings.getSliceTarget();
>>
>> -    if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode()
>> || smallInput) {
>>
>> -      return false;
>>
>> -    }
>>
>> +//    PlannerSettings settings =
>> PrelUtil.getPlannerSettings(call.getPlanner());
>>
>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>
>> +//    boolean smallInput = child.getRows() < settings.getSliceTarget();
>>
>> +//    if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode()
>> || smallInput) {
>>
>> +//      return false;
>>
>> +//    }
>>
>>
>>      for (AggregateCall aggCall : aggregate.getAggCallList()) {
>>
>>        String name = aggCall.getAggregation().getName();
>>
>>
>> Thanks,
>> Maryann
>>
>>
>>
>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org> wrote:
>>
>>> Drill's current approach seems adequate for Drill alone but extending
>>> it to a heterogenous system that includes Phoenix seems like a hack.
>>>
>>> I think you should only create Prels for algebra nodes that you know
>>> for sure are going to run on the Drill engine. If there's a
>>> possibility that it would run in another engine such as Phoenix then
>>> they should still be logical.
>>>
>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <ma...@gmail.com>
>>> wrote:
>>> > The partial aggregate seems to be working now, with one interface
>>> extension
>>> > and one bug fix in the Phoenix project. Will do some code cleanup and
>>> > create a pull request soon.
>>> >
>>> > Still there was a hack in the Drill project which I made to force
>>> 2-phase
>>> > aggregation. I'll try to fix that.
>>> >
>>> > Jacques, I have one question though, how can I verify that there are
>>> more
>>> > than one slice and the shuffle happens?
>>> >
>>> >
>>> > Thanks,
>>> > Maryann
>>> >
>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <ja...@apache.org>
>>> wrote:
>>> >
>>> >> Maryann,
>>> >> I believe Jacques mentioned that a little bit of refactoring is
>>> required
>>> >> for a merge sort to occur - there's something that does that, but
>>> it's not
>>> >> expected to be used in this context currently.
>>> >>
>>> >> IMHO, there's more of a clear value in getting the aggregation to use
>>> >> Phoenix first, so I'd recommend going down that road as Jacques
>>> mentioned
>>> >> above if possible. Once that's working, we can circle back to the
>>> partial
>>> >> sort.
>>> >>
>>> >> Thoughts?
>>> >> James
>>> >>
>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <ma...@gmail.com>
>>> >> wrote:
>>> >>
>>> >>> I actually tried implementing partial sort with
>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured might be
>>> a
>>> >>> little easier to start with than partial aggregation. But I found
>>> that even
>>> >>> though the code worked (returned the right results), the Drill side
>>> sort
>>> >>> turned out to be a ordinary sort instead of a merge which it should
>>> have
>>> >>> been. Any idea of how to fix that?
>>> >>>
>>> >>>
>>> >>> Thanks,
>>> >>> Maryann
>>> >>>
>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <ja...@dremio.com>
>>> >>> wrote:
>>> >>>
>>> >>>> Right now this type of work is done here:
>>> >>>>
>>> >>>>
>>> >>>>
>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>> >>>>
>>> >>>>
>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>> >>>>
>>> >>>> With Distribution Trait application here:
>>> >>>>
>>> >>>>
>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>> >>>>
>>> >>>> To me, the easiest way to solve the Phoenix issue is by providing a
>>> rule
>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix convention
>>> as
>>> >>>> input. It would replace everywhere but would only be plannable when
>>> it is
>>> >>>> the first phase of aggregation.
>>> >>>>
>>> >>>> Thoughts?
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>> --
>>> >>>> Jacques Nadeau
>>> >>>> CTO and Co-Founder, Dremio
>>> >>>>
>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org>
>>> wrote:
>>> >>>>
>>> >>>>> Phoenix is able to perform quite a few relational operations on the
>>> >>>>> region server: scan, filter, project, aggregate, sort (optionally
>>> with
>>> >>>>> limit). However, the sort and aggregate are necessarily "local".
>>> They
>>> >>>>> can only deal with data on that region server, and there needs to
>>> be a
>>> >>>>> further operation to combine the results from the region servers.
>>> >>>>>
>>> >>>>> The question is how to plan such queries. I think the answer is an
>>> >>>>> AggregateExchangeTransposeRule.
>>> >>>>>
>>> >>>>> The rule would spot an Aggregate on a data source that is split
>>> into
>>> >>>>> multiple locations (partitions) and split it into a partial
>>> Aggregate
>>> >>>>> that computes sub-totals and a summarizing Aggregate that combines
>>> >>>>> those totals.
>>> >>>>>
>>> >>>>> How does the planner know that the Aggregate needs to be split?
>>> Since
>>> >>>>> the data's distribution has changed, there would need to be an
>>> >>>>> Exchange operator. It is the Exchange operator that triggers the
>>> rule
>>> >>>>> to fire.
>>> >>>>>
>>> >>>>> There are some special cases. If the data is sorted as well as
>>> >>>>> partitioned (say because the local aggregate uses a sort-based
>>> >>>>> algorithm) we could maybe use a more efficient plan. And if the
>>> >>>>> partition key is the same as the aggregation key we don't need a
>>> >>>>> summarizing Aggregate, just a Union.
>>> >>>>>
>>> >>>>> It turns out not to be very Phoenix-specific. In the
>>> Drill-on-Phoenix
>>> >>>>> scenario, once the Aggregate has been pushed through the Exchange
>>> >>>>> (i.e. onto the drill-bit residing on the region server) we can then
>>> >>>>> push the DrillAggregate across the drill-to-phoenix membrane and
>>> make
>>> >>>>> it into a PhoenixServerAggregate that executes in the region
>>> server.
>>> >>>>>
>>> >>>>> Related issues:
>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>> >>>>>
>>> >>>>> Julian
>>> >>>>>
>>> >>>>
>>> >>>>
>>> >>>
>>> >>
>>>
>>
>>
>

Re: Partial aggregation in Drill-on-Phoenix

Posted by James Taylor <ja...@apache.org>.
Nice progress, Maryann.

A few questions for you: not sure I understand the changes you made to
PhoenixRecordReader. Is it necessary to wrap the server-side scan results
in a GroupedAggregatingResultIterator? Each server-side scan will produce
results with a single tuple per group by key. In Phoenix, the
GroupedAggregatingResultIterator's function in life is to do the final
merge. Note too that the results aren't sorted that come back from the
aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
by the group by key). Or is this just to help in decoding the values coming
back from the scan?

Also, not sure what impact it has in the way we "combine" the scans in our
Drill parallelization code (PhoenixGroupScan.applyAssignments()), as each
of our scans could include duplicate group by keys. Is it ok to combine
them in this case?

One more question: how is the group by key communicated back to Drill?

Thanks,
James


On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <ma...@gmail.com> wrote:

> Added a few fixes in the pull request. Tested with two regions, turned out
> that half of the result is empty (count = 0).
> Not sure if there's anything wrong with
> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
> .
> Like Julian said, this rule looks a bit hacky.
>
> To force a 2-phase HashAgg, I made a temporary change as well:
>
> diff --git
> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>
> index b911f6b..58bc918 100644
>
> ---
> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>
> +++
> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>
> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends Prule {
>
>    // If any of the aggregate functions are not one of these, then we
>
>    // currently won't generate a 2 phase plan.
>
>    protected boolean create2PhasePlan(RelOptRuleCall call,
> DrillAggregateRel aggregate) {
>
> -    PlannerSettings settings =
> PrelUtil.getPlannerSettings(call.getPlanner());
>
> -    RelNode child = call.rel(0).getInputs().get(0);
>
> -    boolean smallInput = child.getRows() < settings.getSliceTarget();
>
> -    if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode() ||
> smallInput) {
>
> -      return false;
>
> -    }
>
> +//    PlannerSettings settings =
> PrelUtil.getPlannerSettings(call.getPlanner());
>
> +//    RelNode child = call.rel(0).getInputs().get(0);
>
> +//    boolean smallInput = child.getRows() < settings.getSliceTarget();
>
> +//    if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode()
> || smallInput) {
>
> +//      return false;
>
> +//    }
>
>
>      for (AggregateCall aggCall : aggregate.getAggCallList()) {
>
>        String name = aggCall.getAggregation().getName();
>
>
> Thanks,
> Maryann
>
>
>
> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org> wrote:
>
>> Drill's current approach seems adequate for Drill alone but extending
>> it to a heterogenous system that includes Phoenix seems like a hack.
>>
>> I think you should only create Prels for algebra nodes that you know
>> for sure are going to run on the Drill engine. If there's a
>> possibility that it would run in another engine such as Phoenix then
>> they should still be logical.
>>
>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <ma...@gmail.com>
>> wrote:
>> > The partial aggregate seems to be working now, with one interface
>> extension
>> > and one bug fix in the Phoenix project. Will do some code cleanup and
>> > create a pull request soon.
>> >
>> > Still there was a hack in the Drill project which I made to force
>> 2-phase
>> > aggregation. I'll try to fix that.
>> >
>> > Jacques, I have one question though, how can I verify that there are
>> more
>> > than one slice and the shuffle happens?
>> >
>> >
>> > Thanks,
>> > Maryann
>> >
>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <ja...@apache.org>
>> wrote:
>> >
>> >> Maryann,
>> >> I believe Jacques mentioned that a little bit of refactoring is
>> required
>> >> for a merge sort to occur - there's something that does that, but it's
>> not
>> >> expected to be used in this context currently.
>> >>
>> >> IMHO, there's more of a clear value in getting the aggregation to use
>> >> Phoenix first, so I'd recommend going down that road as Jacques
>> mentioned
>> >> above if possible. Once that's working, we can circle back to the
>> partial
>> >> sort.
>> >>
>> >> Thoughts?
>> >> James
>> >>
>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <ma...@gmail.com>
>> >> wrote:
>> >>
>> >>> I actually tried implementing partial sort with
>> >>> https://github.com/jacques-n/drill/pull/4, which I figured might be a
>> >>> little easier to start with than partial aggregation. But I found
>> that even
>> >>> though the code worked (returned the right results), the Drill side
>> sort
>> >>> turned out to be a ordinary sort instead of a merge which it should
>> have
>> >>> been. Any idea of how to fix that?
>> >>>
>> >>>
>> >>> Thanks,
>> >>> Maryann
>> >>>
>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <ja...@dremio.com>
>> >>> wrote:
>> >>>
>> >>>> Right now this type of work is done here:
>> >>>>
>> >>>>
>> >>>>
>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>> >>>>
>> >>>>
>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>> >>>>
>> >>>> With Distribution Trait application here:
>> >>>>
>> >>>>
>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>> >>>>
>> >>>> To me, the easiest way to solve the Phoenix issue is by providing a
>> rule
>> >>>> that matches HashAgg and StreamAgg but requires Phoenix convention as
>> >>>> input. It would replace everywhere but would only be plannable when
>> it is
>> >>>> the first phase of aggregation.
>> >>>>
>> >>>> Thoughts?
>> >>>>
>> >>>>
>> >>>>
>> >>>> --
>> >>>> Jacques Nadeau
>> >>>> CTO and Co-Founder, Dremio
>> >>>>
>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org>
>> wrote:
>> >>>>
>> >>>>> Phoenix is able to perform quite a few relational operations on the
>> >>>>> region server: scan, filter, project, aggregate, sort (optionally
>> with
>> >>>>> limit). However, the sort and aggregate are necessarily "local".
>> They
>> >>>>> can only deal with data on that region server, and there needs to
>> be a
>> >>>>> further operation to combine the results from the region servers.
>> >>>>>
>> >>>>> The question is how to plan such queries. I think the answer is an
>> >>>>> AggregateExchangeTransposeRule.
>> >>>>>
>> >>>>> The rule would spot an Aggregate on a data source that is split into
>> >>>>> multiple locations (partitions) and split it into a partial
>> Aggregate
>> >>>>> that computes sub-totals and a summarizing Aggregate that combines
>> >>>>> those totals.
>> >>>>>
>> >>>>> How does the planner know that the Aggregate needs to be split?
>> Since
>> >>>>> the data's distribution has changed, there would need to be an
>> >>>>> Exchange operator. It is the Exchange operator that triggers the
>> rule
>> >>>>> to fire.
>> >>>>>
>> >>>>> There are some special cases. If the data is sorted as well as
>> >>>>> partitioned (say because the local aggregate uses a sort-based
>> >>>>> algorithm) we could maybe use a more efficient plan. And if the
>> >>>>> partition key is the same as the aggregation key we don't need a
>> >>>>> summarizing Aggregate, just a Union.
>> >>>>>
>> >>>>> It turns out not to be very Phoenix-specific. In the
>> Drill-on-Phoenix
>> >>>>> scenario, once the Aggregate has been pushed through the Exchange
>> >>>>> (i.e. onto the drill-bit residing on the region server) we can then
>> >>>>> push the DrillAggregate across the drill-to-phoenix membrane and
>> make
>> >>>>> it into a PhoenixServerAggregate that executes in the region server.
>> >>>>>
>> >>>>> Related issues:
>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>> >>>>>
>> >>>>> Julian
>> >>>>>
>> >>>>
>> >>>>
>> >>>
>> >>
>>
>
>

Re: Partial aggregation in Drill-on-Phoenix

Posted by Maryann Xue <ma...@gmail.com>.
Added a few fixes in the pull request. Tested with two regions, turned out
that half of the result is empty (count = 0).
Not sure if there's anything wrong with
https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
.
Like Julian said, this rule looks a bit hacky.

To force a 2-phase HashAgg, I made a temporary change as well:

diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java

index b911f6b..58bc918 100644

---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java

+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java

@@ -60,12 +60,12 @@ public abstract class AggPruleBase extends Prule {

   // If any of the aggregate functions are not one of these, then we

   // currently won't generate a 2 phase plan.

   protected boolean create2PhasePlan(RelOptRuleCall call,
DrillAggregateRel aggregate) {

-    PlannerSettings settings =
PrelUtil.getPlannerSettings(call.getPlanner());

-    RelNode child = call.rel(0).getInputs().get(0);

-    boolean smallInput = child.getRows() < settings.getSliceTarget();

-    if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode() ||
smallInput) {

-      return false;

-    }

+//    PlannerSettings settings =
PrelUtil.getPlannerSettings(call.getPlanner());

+//    RelNode child = call.rel(0).getInputs().get(0);

+//    boolean smallInput = child.getRows() < settings.getSliceTarget();

+//    if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode()
|| smallInput) {

+//      return false;

+//    }


     for (AggregateCall aggCall : aggregate.getAggCallList()) {

       String name = aggCall.getAggregation().getName();


Thanks,
Maryann



On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org> wrote:

> Drill's current approach seems adequate for Drill alone but extending
> it to a heterogenous system that includes Phoenix seems like a hack.
>
> I think you should only create Prels for algebra nodes that you know
> for sure are going to run on the Drill engine. If there's a
> possibility that it would run in another engine such as Phoenix then
> they should still be logical.
>
> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <ma...@gmail.com>
> wrote:
> > The partial aggregate seems to be working now, with one interface
> extension
> > and one bug fix in the Phoenix project. Will do some code cleanup and
> > create a pull request soon.
> >
> > Still there was a hack in the Drill project which I made to force 2-phase
> > aggregation. I'll try to fix that.
> >
> > Jacques, I have one question though, how can I verify that there are more
> > than one slice and the shuffle happens?
> >
> >
> > Thanks,
> > Maryann
> >
> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <ja...@apache.org>
> wrote:
> >
> >> Maryann,
> >> I believe Jacques mentioned that a little bit of refactoring is required
> >> for a merge sort to occur - there's something that does that, but it's
> not
> >> expected to be used in this context currently.
> >>
> >> IMHO, there's more of a clear value in getting the aggregation to use
> >> Phoenix first, so I'd recommend going down that road as Jacques
> mentioned
> >> above if possible. Once that's working, we can circle back to the
> partial
> >> sort.
> >>
> >> Thoughts?
> >> James
> >>
> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <ma...@gmail.com>
> >> wrote:
> >>
> >>> I actually tried implementing partial sort with
> >>> https://github.com/jacques-n/drill/pull/4, which I figured might be a
> >>> little easier to start with than partial aggregation. But I found that
> even
> >>> though the code worked (returned the right results), the Drill side
> sort
> >>> turned out to be a ordinary sort instead of a merge which it should
> have
> >>> been. Any idea of how to fix that?
> >>>
> >>>
> >>> Thanks,
> >>> Maryann
> >>>
> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <ja...@dremio.com>
> >>> wrote:
> >>>
> >>>> Right now this type of work is done here:
> >>>>
> >>>>
> >>>>
> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
> >>>>
> >>>>
> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
> >>>>
> >>>> With Distribution Trait application here:
> >>>>
> >>>>
> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
> >>>>
> >>>> To me, the easiest way to solve the Phoenix issue is by providing a
> rule
> >>>> that matches HashAgg and StreamAgg but requires Phoenix convention as
> >>>> input. It would replace everywhere but would only be plannable when
> it is
> >>>> the first phase of aggregation.
> >>>>
> >>>> Thoughts?
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Jacques Nadeau
> >>>> CTO and Co-Founder, Dremio
> >>>>
> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org> wrote:
> >>>>
> >>>>> Phoenix is able to perform quite a few relational operations on the
> >>>>> region server: scan, filter, project, aggregate, sort (optionally
> with
> >>>>> limit). However, the sort and aggregate are necessarily "local". They
> >>>>> can only deal with data on that region server, and there needs to be
> a
> >>>>> further operation to combine the results from the region servers.
> >>>>>
> >>>>> The question is how to plan such queries. I think the answer is an
> >>>>> AggregateExchangeTransposeRule.
> >>>>>
> >>>>> The rule would spot an Aggregate on a data source that is split into
> >>>>> multiple locations (partitions) and split it into a partial Aggregate
> >>>>> that computes sub-totals and a summarizing Aggregate that combines
> >>>>> those totals.
> >>>>>
> >>>>> How does the planner know that the Aggregate needs to be split? Since
> >>>>> the data's distribution has changed, there would need to be an
> >>>>> Exchange operator. It is the Exchange operator that triggers the rule
> >>>>> to fire.
> >>>>>
> >>>>> There are some special cases. If the data is sorted as well as
> >>>>> partitioned (say because the local aggregate uses a sort-based
> >>>>> algorithm) we could maybe use a more efficient plan. And if the
> >>>>> partition key is the same as the aggregation key we don't need a
> >>>>> summarizing Aggregate, just a Union.
> >>>>>
> >>>>> It turns out not to be very Phoenix-specific. In the Drill-on-Phoenix
> >>>>> scenario, once the Aggregate has been pushed through the Exchange
> >>>>> (i.e. onto the drill-bit residing on the region server) we can then
> >>>>> push the DrillAggregate across the drill-to-phoenix membrane and make
> >>>>> it into a PhoenixServerAggregate that executes in the region server.
> >>>>>
> >>>>> Related issues:
> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
> >>>>>
> >>>>> Julian
> >>>>>
> >>>>
> >>>>
> >>>
> >>
>

Re: Partial aggregation in Drill-on-Phoenix

Posted by Jacques Nadeau <ja...@dremio.com>.
I'm not sure how to accomplish this cleanly. The concept of two-phased
agg-key distributed aggregation (and exchanges in general) seems very much
a physical concept. Since Phoenix can only do half this operation (in
parallel), I'm having trouble figuring out what the logical plan would look
like if we did this transformation in the logical planning phase. (In
general, I think part of the problem is that each phase of planning can
output only a single plan.) Since we have chosen to break the planning into
~5 phases due to performance, we have to pick where the transformations are
most appropriate.

--
Jacques Nadeau
CTO and Co-Founder, Dremio

On Tue, Oct 6, 2015 at 11:31 AM, Julian Hyde <jh...@apache.org> wrote:

> Drill's current approach seems adequate for Drill alone but extending
> it to a heterogenous system that includes Phoenix seems like a hack.
>
> I think you should only create Prels for algebra nodes that you know
> for sure are going to run on the Drill engine. If there's a
> possibility that it would run in another engine such as Phoenix then
> they should still be logical.
>
> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <ma...@gmail.com>
> wrote:
> > The partial aggregate seems to be working now, with one interface
> extension
> > and one bug fix in the Phoenix project. Will do some code cleanup and
> > create a pull request soon.
> >
> > Still there was a hack in the Drill project which I made to force 2-phase
> > aggregation. I'll try to fix that.
> >
> > Jacques, I have one question though, how can I verify that there are more
> > than one slice and the shuffle happens?
> >
> >
> > Thanks,
> > Maryann
> >
> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <ja...@apache.org>
> wrote:
> >
> >> Maryann,
> >> I believe Jacques mentioned that a little bit of refactoring is required
> >> for a merge sort to occur - there's something that does that, but it's
> not
> >> expected to be used in this context currently.
> >>
> >> IMHO, there's more of a clear value in getting the aggregation to use
> >> Phoenix first, so I'd recommend going down that road as Jacques
> mentioned
> >> above if possible. Once that's working, we can circle back to the
> partial
> >> sort.
> >>
> >> Thoughts?
> >> James
> >>
> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <ma...@gmail.com>
> >> wrote:
> >>
> >>> I actually tried implementing partial sort with
> >>> https://github.com/jacques-n/drill/pull/4, which I figured might be a
> >>> little easier to start with than partial aggregation. But I found that
> even
> >>> though the code worked (returned the right results), the Drill side
> sort
> >>> turned out to be a ordinary sort instead of a merge which it should
> have
> >>> been. Any idea of how to fix that?
> >>>
> >>>
> >>> Thanks,
> >>> Maryann
> >>>
> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <ja...@dremio.com>
> >>> wrote:
> >>>
> >>>> Right now this type of work is done here:
> >>>>
> >>>>
> >>>>
> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
> >>>>
> >>>>
> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
> >>>>
> >>>> With Distribution Trait application here:
> >>>>
> >>>>
> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
> >>>>
> >>>> To me, the easiest way to solve the Phoenix issue is by providing a
> rule
> >>>> that matches HashAgg and StreamAgg but requires Phoenix convention as
> >>>> input. It would replace everywhere but would only be plannable when
> it is
> >>>> the first phase of aggregation.
> >>>>
> >>>> Thoughts?
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Jacques Nadeau
> >>>> CTO and Co-Founder, Dremio
> >>>>
> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org> wrote:
> >>>>
> >>>>> Phoenix is able to perform quite a few relational operations on the
> >>>>> region server: scan, filter, project, aggregate, sort (optionally
> with
> >>>>> limit). However, the sort and aggregate are necessarily "local". They
> >>>>> can only deal with data on that region server, and there needs to be
> a
> >>>>> further operation to combine the results from the region servers.
> >>>>>
> >>>>> The question is how to plan such queries. I think the answer is an
> >>>>> AggregateExchangeTransposeRule.
> >>>>>
> >>>>> The rule would spot an Aggregate on a data source that is split into
> >>>>> multiple locations (partitions) and split it into a partial Aggregate
> >>>>> that computes sub-totals and a summarizing Aggregate that combines
> >>>>> those totals.
> >>>>>
> >>>>> How does the planner know that the Aggregate needs to be split? Since
> >>>>> the data's distribution has changed, there would need to be an
> >>>>> Exchange operator. It is the Exchange operator that triggers the rule
> >>>>> to fire.
> >>>>>
> >>>>> There are some special cases. If the data is sorted as well as
> >>>>> partitioned (say because the local aggregate uses a sort-based
> >>>>> algorithm) we could maybe use a more efficient plan. And if the
> >>>>> partition key is the same as the aggregation key we don't need a
> >>>>> summarizing Aggregate, just a Union.
> >>>>>
> >>>>> It turns out not to be very Phoenix-specific. In the Drill-on-Phoenix
> >>>>> scenario, once the Aggregate has been pushed through the Exchange
> >>>>> (i.e. onto the drill-bit residing on the region server) we can then
> >>>>> push the DrillAggregate across the drill-to-phoenix membrane and make
> >>>>> it into a PhoenixServerAggregate that executes in the region server.
> >>>>>
> >>>>> Related issues:
> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
> >>>>>
> >>>>> Julian
> >>>>>
> >>>>
> >>>>
> >>>
> >>
>

Re: Partial aggregation in Drill-on-Phoenix

Posted by Julian Hyde <jh...@apache.org>.
Drill's current approach seems adequate for Drill alone but extending
it to a heterogenous system that includes Phoenix seems like a hack.

I think you should only create Prels for algebra nodes that you know
for sure are going to run on the Drill engine. If there's a
possibility that it would run in another engine such as Phoenix then
they should still be logical.

On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <ma...@gmail.com> wrote:
> The partial aggregate seems to be working now, with one interface extension
> and one bug fix in the Phoenix project. Will do some code cleanup and
> create a pull request soon.
>
> Still there was a hack in the Drill project which I made to force 2-phase
> aggregation. I'll try to fix that.
>
> Jacques, I have one question though, how can I verify that there are more
> than one slice and the shuffle happens?
>
>
> Thanks,
> Maryann
>
> On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <ja...@apache.org> wrote:
>
>> Maryann,
>> I believe Jacques mentioned that a little bit of refactoring is required
>> for a merge sort to occur - there's something that does that, but it's not
>> expected to be used in this context currently.
>>
>> IMHO, there's more of a clear value in getting the aggregation to use
>> Phoenix first, so I'd recommend going down that road as Jacques mentioned
>> above if possible. Once that's working, we can circle back to the partial
>> sort.
>>
>> Thoughts?
>> James
>>
>> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <ma...@gmail.com>
>> wrote:
>>
>>> I actually tried implementing partial sort with
>>> https://github.com/jacques-n/drill/pull/4, which I figured might be a
>>> little easier to start with than partial aggregation. But I found that even
>>> though the code worked (returned the right results), the Drill side sort
>>> turned out to be a ordinary sort instead of a merge which it should have
>>> been. Any idea of how to fix that?
>>>
>>>
>>> Thanks,
>>> Maryann
>>>
>>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <ja...@dremio.com>
>>> wrote:
>>>
>>>> Right now this type of work is done here:
>>>>
>>>>
>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>>
>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>
>>>> With Distribution Trait application here:
>>>>
>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>>
>>>> To me, the easiest way to solve the Phoenix issue is by providing a rule
>>>> that matches HashAgg and StreamAgg but requires Phoenix convention as
>>>> input. It would replace everywhere but would only be plannable when it is
>>>> the first phase of aggregation.
>>>>
>>>> Thoughts?
>>>>
>>>>
>>>>
>>>> --
>>>> Jacques Nadeau
>>>> CTO and Co-Founder, Dremio
>>>>
>>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org> wrote:
>>>>
>>>>> Phoenix is able to perform quite a few relational operations on the
>>>>> region server: scan, filter, project, aggregate, sort (optionally with
>>>>> limit). However, the sort and aggregate are necessarily "local". They
>>>>> can only deal with data on that region server, and there needs to be a
>>>>> further operation to combine the results from the region servers.
>>>>>
>>>>> The question is how to plan such queries. I think the answer is an
>>>>> AggregateExchangeTransposeRule.
>>>>>
>>>>> The rule would spot an Aggregate on a data source that is split into
>>>>> multiple locations (partitions) and split it into a partial Aggregate
>>>>> that computes sub-totals and a summarizing Aggregate that combines
>>>>> those totals.
>>>>>
>>>>> How does the planner know that the Aggregate needs to be split? Since
>>>>> the data's distribution has changed, there would need to be an
>>>>> Exchange operator. It is the Exchange operator that triggers the rule
>>>>> to fire.
>>>>>
>>>>> There are some special cases. If the data is sorted as well as
>>>>> partitioned (say because the local aggregate uses a sort-based
>>>>> algorithm) we could maybe use a more efficient plan. And if the
>>>>> partition key is the same as the aggregation key we don't need a
>>>>> summarizing Aggregate, just a Union.
>>>>>
>>>>> It turns out not to be very Phoenix-specific. In the Drill-on-Phoenix
>>>>> scenario, once the Aggregate has been pushed through the Exchange
>>>>> (i.e. onto the drill-bit residing on the region server) we can then
>>>>> push the DrillAggregate across the drill-to-phoenix membrane and make
>>>>> it into a PhoenixServerAggregate that executes in the region server.
>>>>>
>>>>> Related issues:
>>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>>>
>>>>> Julian
>>>>>
>>>>
>>>>
>>>
>>

Re: Partial aggregation in Drill-on-Phoenix

Posted by Maryann Xue <ma...@gmail.com>.
The partial aggregate seems to be working now, with one interface extension
and one bug fix in the Phoenix project. Will do some code cleanup and
create a pull request soon.

Still there was a hack in the Drill project which I made to force 2-phase
aggregation. I'll try to fix that.

Jacques, I have one question though, how can I verify that there are more
than one slice and the shuffle happens?


Thanks,
Maryann

On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <ja...@apache.org> wrote:

> Maryann,
> I believe Jacques mentioned that a little bit of refactoring is required
> for a merge sort to occur - there's something that does that, but it's not
> expected to be used in this context currently.
>
> IMHO, there's more of a clear value in getting the aggregation to use
> Phoenix first, so I'd recommend going down that road as Jacques mentioned
> above if possible. Once that's working, we can circle back to the partial
> sort.
>
> Thoughts?
> James
>
> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <ma...@gmail.com>
> wrote:
>
>> I actually tried implementing partial sort with
>> https://github.com/jacques-n/drill/pull/4, which I figured might be a
>> little easier to start with than partial aggregation. But I found that even
>> though the code worked (returned the right results), the Drill side sort
>> turned out to be a ordinary sort instead of a merge which it should have
>> been. Any idea of how to fix that?
>>
>>
>> Thanks,
>> Maryann
>>
>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <ja...@dremio.com>
>> wrote:
>>
>>> Right now this type of work is done here:
>>>
>>>
>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>
>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>
>>> With Distribution Trait application here:
>>>
>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>
>>> To me, the easiest way to solve the Phoenix issue is by providing a rule
>>> that matches HashAgg and StreamAgg but requires Phoenix convention as
>>> input. It would replace everywhere but would only be plannable when it is
>>> the first phase of aggregation.
>>>
>>> Thoughts?
>>>
>>>
>>>
>>> --
>>> Jacques Nadeau
>>> CTO and Co-Founder, Dremio
>>>
>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org> wrote:
>>>
>>>> Phoenix is able to perform quite a few relational operations on the
>>>> region server: scan, filter, project, aggregate, sort (optionally with
>>>> limit). However, the sort and aggregate are necessarily "local". They
>>>> can only deal with data on that region server, and there needs to be a
>>>> further operation to combine the results from the region servers.
>>>>
>>>> The question is how to plan such queries. I think the answer is an
>>>> AggregateExchangeTransposeRule.
>>>>
>>>> The rule would spot an Aggregate on a data source that is split into
>>>> multiple locations (partitions) and split it into a partial Aggregate
>>>> that computes sub-totals and a summarizing Aggregate that combines
>>>> those totals.
>>>>
>>>> How does the planner know that the Aggregate needs to be split? Since
>>>> the data's distribution has changed, there would need to be an
>>>> Exchange operator. It is the Exchange operator that triggers the rule
>>>> to fire.
>>>>
>>>> There are some special cases. If the data is sorted as well as
>>>> partitioned (say because the local aggregate uses a sort-based
>>>> algorithm) we could maybe use a more efficient plan. And if the
>>>> partition key is the same as the aggregation key we don't need a
>>>> summarizing Aggregate, just a Union.
>>>>
>>>> It turns out not to be very Phoenix-specific. In the Drill-on-Phoenix
>>>> scenario, once the Aggregate has been pushed through the Exchange
>>>> (i.e. onto the drill-bit residing on the region server) we can then
>>>> push the DrillAggregate across the drill-to-phoenix membrane and make
>>>> it into a PhoenixServerAggregate that executes in the region server.
>>>>
>>>> Related issues:
>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>>
>>>> Julian
>>>>
>>>
>>>
>>
>

Re: Partial aggregation in Drill-on-Phoenix

Posted by James Taylor <ja...@apache.org>.
Maryann,
I believe Jacques mentioned that a little bit of refactoring is required
for a merge sort to occur - there's something that does that, but it's not
expected to be used in this context currently.

IMHO, there's more of a clear value in getting the aggregation to use
Phoenix first, so I'd recommend going down that road as Jacques mentioned
above if possible. Once that's working, we can circle back to the partial
sort.

Thoughts?
James

On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <ma...@gmail.com> wrote:

> I actually tried implementing partial sort with
> https://github.com/jacques-n/drill/pull/4, which I figured might be a
> little easier to start with than partial aggregation. But I found that even
> though the code worked (returned the right results), the Drill side sort
> turned out to be a ordinary sort instead of a merge which it should have
> been. Any idea of how to fix that?
>
>
> Thanks,
> Maryann
>
> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <ja...@dremio.com>
> wrote:
>
>> Right now this type of work is done here:
>>
>>
>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>
>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>
>> With Distribution Trait application here:
>>
>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>
>> To me, the easiest way to solve the Phoenix issue is by providing a rule
>> that matches HashAgg and StreamAgg but requires Phoenix convention as
>> input. It would replace everywhere but would only be plannable when it is
>> the first phase of aggregation.
>>
>> Thoughts?
>>
>>
>>
>> --
>> Jacques Nadeau
>> CTO and Co-Founder, Dremio
>>
>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org> wrote:
>>
>>> Phoenix is able to perform quite a few relational operations on the
>>> region server: scan, filter, project, aggregate, sort (optionally with
>>> limit). However, the sort and aggregate are necessarily "local". They
>>> can only deal with data on that region server, and there needs to be a
>>> further operation to combine the results from the region servers.
>>>
>>> The question is how to plan such queries. I think the answer is an
>>> AggregateExchangeTransposeRule.
>>>
>>> The rule would spot an Aggregate on a data source that is split into
>>> multiple locations (partitions) and split it into a partial Aggregate
>>> that computes sub-totals and a summarizing Aggregate that combines
>>> those totals.
>>>
>>> How does the planner know that the Aggregate needs to be split? Since
>>> the data's distribution has changed, there would need to be an
>>> Exchange operator. It is the Exchange operator that triggers the rule
>>> to fire.
>>>
>>> There are some special cases. If the data is sorted as well as
>>> partitioned (say because the local aggregate uses a sort-based
>>> algorithm) we could maybe use a more efficient plan. And if the
>>> partition key is the same as the aggregation key we don't need a
>>> summarizing Aggregate, just a Union.
>>>
>>> It turns out not to be very Phoenix-specific. In the Drill-on-Phoenix
>>> scenario, once the Aggregate has been pushed through the Exchange
>>> (i.e. onto the drill-bit residing on the region server) we can then
>>> push the DrillAggregate across the drill-to-phoenix membrane and make
>>> it into a PhoenixServerAggregate that executes in the region server.
>>>
>>> Related issues:
>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>
>>> Julian
>>>
>>
>>
>

Re: Partial aggregation in Drill-on-Phoenix

Posted by Maryann Xue <ma...@gmail.com>.
I actually tried implementing partial sort with
https://github.com/jacques-n/drill/pull/4, which I figured might be a
little easier to start with than partial aggregation. But I found that even
though the code worked (returned the right results), the Drill side sort
turned out to be a ordinary sort instead of a merge which it should have
been. Any idea of how to fix that?


Thanks,
Maryann

On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <ja...@dremio.com> wrote:

> Right now this type of work is done here:
>
>
> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>
> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>
> With Distribution Trait application here:
>
> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>
> To me, the easiest way to solve the Phoenix issue is by providing a rule
> that matches HashAgg and StreamAgg but requires Phoenix convention as
> input. It would replace everywhere but would only be plannable when it is
> the first phase of aggregation.
>
> Thoughts?
>
>
>
> --
> Jacques Nadeau
> CTO and Co-Founder, Dremio
>
> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org> wrote:
>
>> Phoenix is able to perform quite a few relational operations on the
>> region server: scan, filter, project, aggregate, sort (optionally with
>> limit). However, the sort and aggregate are necessarily "local". They
>> can only deal with data on that region server, and there needs to be a
>> further operation to combine the results from the region servers.
>>
>> The question is how to plan such queries. I think the answer is an
>> AggregateExchangeTransposeRule.
>>
>> The rule would spot an Aggregate on a data source that is split into
>> multiple locations (partitions) and split it into a partial Aggregate
>> that computes sub-totals and a summarizing Aggregate that combines
>> those totals.
>>
>> How does the planner know that the Aggregate needs to be split? Since
>> the data's distribution has changed, there would need to be an
>> Exchange operator. It is the Exchange operator that triggers the rule
>> to fire.
>>
>> There are some special cases. If the data is sorted as well as
>> partitioned (say because the local aggregate uses a sort-based
>> algorithm) we could maybe use a more efficient plan. And if the
>> partition key is the same as the aggregation key we don't need a
>> summarizing Aggregate, just a Union.
>>
>> It turns out not to be very Phoenix-specific. In the Drill-on-Phoenix
>> scenario, once the Aggregate has been pushed through the Exchange
>> (i.e. onto the drill-bit residing on the region server) we can then
>> push the DrillAggregate across the drill-to-phoenix membrane and make
>> it into a PhoenixServerAggregate that executes in the region server.
>>
>> Related issues:
>> * https://issues.apache.org/jira/browse/DRILL-3840
>> * https://issues.apache.org/jira/browse/CALCITE-751
>>
>> Julian
>>
>
>

Re: Partial aggregation in Drill-on-Phoenix

Posted by Jacques Nadeau <ja...@dremio.com>.
Right now this type of work is done here:

https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java

With Distribution Trait application here:
https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java

To me, the easiest way to solve the Phoenix issue is by providing a rule
that matches HashAgg and StreamAgg but requires Phoenix convention as
input. It would replace everywhere but would only be plannable when it is
the first phase of aggregation.

Thoughts?



--
Jacques Nadeau
CTO and Co-Founder, Dremio

On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org> wrote:

> Phoenix is able to perform quite a few relational operations on the
> region server: scan, filter, project, aggregate, sort (optionally with
> limit). However, the sort and aggregate are necessarily "local". They
> can only deal with data on that region server, and there needs to be a
> further operation to combine the results from the region servers.
>
> The question is how to plan such queries. I think the answer is an
> AggregateExchangeTransposeRule.
>
> The rule would spot an Aggregate on a data source that is split into
> multiple locations (partitions) and split it into a partial Aggregate
> that computes sub-totals and a summarizing Aggregate that combines
> those totals.
>
> How does the planner know that the Aggregate needs to be split? Since
> the data's distribution has changed, there would need to be an
> Exchange operator. It is the Exchange operator that triggers the rule
> to fire.
>
> There are some special cases. If the data is sorted as well as
> partitioned (say because the local aggregate uses a sort-based
> algorithm) we could maybe use a more efficient plan. And if the
> partition key is the same as the aggregation key we don't need a
> summarizing Aggregate, just a Union.
>
> It turns out not to be very Phoenix-specific. In the Drill-on-Phoenix
> scenario, once the Aggregate has been pushed through the Exchange
> (i.e. onto the drill-bit residing on the region server) we can then
> push the DrillAggregate across the drill-to-phoenix membrane and make
> it into a PhoenixServerAggregate that executes in the region server.
>
> Related issues:
> * https://issues.apache.org/jira/browse/DRILL-3840
> * https://issues.apache.org/jira/browse/CALCITE-751
>
> Julian
>