You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2016/05/13 00:05:39 UTC

[2/3] phoenix git commit: PHOENIX-2876 Using aggregation function in ORDER BY (Sergey Soldatov)

PHOENIX-2876 Using aggregation function in ORDER BY (Sergey Soldatov)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/af8d3b65
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/af8d3b65
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/af8d3b65

Branch: refs/heads/master
Commit: af8d3b65c84fd57b91b99ff36de2194149c5a94e
Parents: d414505
Author: James Taylor <ja...@apache.org>
Authored: Thu May 12 14:32:34 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu May 12 14:32:34 2016 -0700

----------------------------------------------------------------------
 .../phoenix/compile/AggregationManager.java     | 60 ++++++++++++++++++++
 .../apache/phoenix/compile/DeleteCompiler.java  |  1 +
 .../apache/phoenix/compile/PostDDLCompiler.java |  1 +
 .../phoenix/compile/ProjectionCompiler.java     | 53 +----------------
 .../apache/phoenix/compile/QueryCompiler.java   |  1 +
 .../apache/phoenix/compile/UpsertCompiler.java  |  6 +-
 .../phoenix/compile/QueryCompilerTest.java      | 16 +++++-
 7 files changed, 86 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
index ee2497b..c8e672e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/AggregationManager.java
@@ -17,7 +17,21 @@
  */
 package org.apache.phoenix.compile;
 
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.aggregator.ClientAggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.expression.visitor.SingleAggregateFunctionVisitor;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 
 /**
  * 
@@ -52,4 +66,50 @@ public class AggregationManager {
     public void setAggregators(ClientAggregators clientAggregator) {
         this.aggregators = clientAggregator;
     }
+    /**
+     * Compiles projection by:
+     * 1) Adding RowCount aggregate function if not present when limiting rows. We need this
+     *    to track how many rows have been scanned.
+     * 2) Reordering aggregation functions (by putting fixed length aggregates first) to
+     *    optimize the positional access of the aggregated value.
+     */
+    public void compile(StatementContext context, GroupByCompiler.GroupBy groupBy) throws
+            SQLException {
+        final Set<SingleAggregateFunction> aggFuncSet = Sets.newHashSetWithExpectedSize(context.getExpressionManager().getExpressionCount());
+
+        Iterator<Expression> expressions = context.getExpressionManager().getExpressions();
+        while (expressions.hasNext()) {
+            Expression expression = expressions.next();
+            expression.accept(new SingleAggregateFunctionVisitor() {
+                @Override
+                public Iterator<Expression> visitEnter(SingleAggregateFunction function) {
+                    aggFuncSet.add(function);
+                    return Iterators.emptyIterator();
+                }
+            });
+        }
+        if (aggFuncSet.isEmpty() && groupBy.isEmpty()) {
+            return;
+        }
+        List<SingleAggregateFunction> aggFuncs = new ArrayList<SingleAggregateFunction>(aggFuncSet);
+        Collections.sort(aggFuncs, SingleAggregateFunction.SCHEMA_COMPARATOR);
+
+        int minNullableIndex = getMinNullableIndex(aggFuncs,groupBy.isEmpty());
+        context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex));
+        ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex);
+        context.getAggregationManager().setAggregators(clientAggregators);
+    }
+
+    private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) {
+        int minNullableIndex = aggFuncs.size();
+        for (int i = 0; i < aggFuncs.size(); i++) {
+            SingleAggregateFunction aggFunc = aggFuncs.get(i);
+            if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() : aggFunc.getAggregatorExpression().isNullable()) {
+                minNullableIndex = i;
+                break;
+            }
+        }
+        return minNullableIndex;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 2a97686..fa3dd62 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -508,6 +508,7 @@ public class DeleteCompiler {
                 // Ignoring ORDER BY, since with auto commit on and no limit makes no difference
                 SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint());
                 RowProjector projectorToBe = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY);
+                context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
                 if (plan.getProjector().projectEveryRow()) {
                     projectorToBe = new RowProjector(projectorToBe,true);
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index a786438..e43b596 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -253,6 +253,7 @@ public class PostDDLCompiler {
                             }
                             // Need to project all column families into the scan, since we haven't yet created our empty key value
                             RowProjector projector = ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE, GroupBy.EMPTY_GROUP_BY);
+                            context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
                             // Explicitly project these column families and don't project the empty key value,
                             // since at this point we haven't added the empty key value everywhere.
                             if (columnFamilies != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
index 3cf3934..8d7d7cf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
@@ -188,8 +188,8 @@ public class ProjectionCompiler {
         try {
         	dataTable = conn.getTable(new PTableKey(tenantId, tableName));
         } catch (TableNotFoundException e) {
-            if (tenantId != null) { 
-            	// Check with null tenantId 
+            if (tenantId != null) {
+                // Check with null tenantId
             	dataTable = conn.getTable(new PTableKey(null, tableName));
             }
             else {
@@ -483,8 +483,6 @@ public class ProjectionCompiler {
                 }
             }
         }
-        
-        selectVisitor.compile();
         boolean isProjectEmptyKeyValue = false;
         if (isWildcard) {
             projectAllColumnFamilies(table, scan);
@@ -576,18 +574,7 @@ public class ProjectionCompiler {
     }
 
     private static class SelectClauseVisitor extends ExpressionCompiler {
-        private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) {
-            int minNullableIndex = aggFuncs.size();
-            for (int i = 0; i < aggFuncs.size(); i++) {
-                SingleAggregateFunction aggFunc = aggFuncs.get(i);
-                if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() : aggFunc.getAggregatorExpression().isNullable()) {
-                    minNullableIndex = i;
-                    break;
-                }
-            }
-            return minNullableIndex;
-        }
-        
+
         /**
          * Track whether or not the projection expression is case sensitive. We use this
          * information to determine whether or not we normalize the column name passed
@@ -613,40 +600,6 @@ public class ProjectionCompiler {
             reset();
         }
 
-
-        /**
-         * Compiles projection by:
-         * 1) Adding RowCount aggregate function if not present when limiting rows. We need this
-         *    to track how many rows have been scanned.
-         * 2) Reordering aggregation functions (by putting fixed length aggregates first) to
-         *    optimize the positional access of the aggregated value.
-         */
-        private void compile() throws SQLException {
-            final Set<SingleAggregateFunction> aggFuncSet = Sets.newHashSetWithExpectedSize(context.getExpressionManager().getExpressionCount());
-    
-            Iterator<Expression> expressions = context.getExpressionManager().getExpressions();
-            while (expressions.hasNext()) {
-                Expression expression = expressions.next();
-                expression.accept(new SingleAggregateFunctionVisitor() {
-                    @Override
-                    public Iterator<Expression> visitEnter(SingleAggregateFunction function) {
-                        aggFuncSet.add(function);
-                        return Iterators.emptyIterator();
-                    }
-                });
-            }
-            if (aggFuncSet.isEmpty() && groupBy.isEmpty()) {
-                return;
-            }
-            List<SingleAggregateFunction> aggFuncs = new ArrayList<SingleAggregateFunction>(aggFuncSet);
-            Collections.sort(aggFuncs, SingleAggregateFunction.SCHEMA_COMPARATOR);
-    
-            int minNullableIndex = getMinNullableIndex(aggFuncs,groupBy.isEmpty());
-            context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex));
-            ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex);
-            context.getAggregationManager().setAggregators(clientAggregators);
-        }
-        
         @Override
         public void reset() {
             super.reset();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 82c9731..4488aff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -564,6 +564,7 @@ public class QueryCompiler {
         RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns, where);
         OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit, offset, projector,
                 groupBy == GroupBy.EMPTY_GROUP_BY ? innerPlanTupleProjector : null, isInRowKeyOrder);
+        context.getAggregationManager().compile(context, groupBy);
         // Final step is to build the query plan
         if (!asSubquery) {
             int maxRows = statement.getMaxRows();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 7c6347f..e2fc2ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -636,7 +636,11 @@ public class UpsertCompiler {
                     PTable projectedTable = PTableImpl.makePTable(table, projectedColumns);
                     
                     SelectStatement select = SelectStatement.create(SelectStatement.COUNT_ONE, upsert.getHint());
-                    RowProjector aggProjectorToBe = ProjectionCompiler.compile(queryPlan.getContext(), select, GroupBy.EMPTY_GROUP_BY);
+                    StatementContext statementContext = queryPlan.getContext();
+                    RowProjector aggProjectorToBe = ProjectionCompiler.compile(statementContext, select, GroupBy
+                            .EMPTY_GROUP_BY);
+                    statementContext.getAggregationManager().compile(queryPlan.getContext()
+                            ,GroupBy.EMPTY_GROUP_BY);
                     if (queryPlan.getProjector().projectEveryRow()) {
                         aggProjectorToBe = new RowProjector(aggProjectorToBe,true);
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af8d3b65/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 4b756fa..1db90a9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -2275,5 +2275,19 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
             conn.close();
         }
     }
-
+    @Test
+    public void testOrderByWithNoProjection() throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            conn.createStatement().execute("create table x (id integer primary key, A.i1 integer," +
+                    " B.i2 integer)");
+            Scan scan = projectQuery("select A.i1 from X group by i1 order by avg(B.i2) " +
+                    "desc");
+            ServerAggregators aggregators = ServerAggregators.deserialize(scan.getAttribute
+                    (BaseScannerRegionObserver.AGGREGATORS), null);
+            assertEquals(2,aggregators.getAggregatorCount());
+        } finally {
+            conn.close();
+        }
+    }
 }